14325: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / test / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "fmt"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14 )
15
16 // Queue is a test stub for container.Queue. The caller specifies the
17 // initial queue state.
18 type Queue struct {
19         // Containers represent the API server database contents.
20         Containers []arvados.Container
21
22         // ChooseType will be called for each entry in Containers. It
23         // must not be nil.
24         ChooseType func(*arvados.Container) (arvados.InstanceType, error)
25
26         entries     map[string]container.QueueEnt
27         updTime     time.Time
28         subscribers map[<-chan struct{}]chan struct{}
29
30         mtx sync.Mutex
31 }
32
33 // Entries returns the containers that were queued when Update was
34 // last called.
35 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
36         q.mtx.Lock()
37         defer q.mtx.Unlock()
38         updTime := q.updTime
39         r := map[string]container.QueueEnt{}
40         for uuid, ent := range q.entries {
41                 r[uuid] = ent
42         }
43         return r, updTime
44 }
45
46 // Get returns the container from the cached queue, i.e., as it was
47 // when Update was last called -- just like a container.Queue does. If
48 // the state has been changed (via Lock, Unlock, or Cancel) since the
49 // last Update, the updated state is returned.
50 func (q *Queue) Get(uuid string) (arvados.Container, bool) {
51         q.mtx.Lock()
52         defer q.mtx.Unlock()
53         ent, ok := q.entries[uuid]
54         return ent.Container, ok
55 }
56
57 func (q *Queue) Forget(uuid string) {
58         q.mtx.Lock()
59         defer q.mtx.Unlock()
60         delete(q.entries, uuid)
61 }
62
63 func (q *Queue) Lock(uuid string) error {
64         q.mtx.Lock()
65         defer q.mtx.Unlock()
66         return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
67 }
68
69 func (q *Queue) Unlock(uuid string) error {
70         q.mtx.Lock()
71         defer q.mtx.Unlock()
72         return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
73 }
74
75 func (q *Queue) Cancel(uuid string) error {
76         q.mtx.Lock()
77         defer q.mtx.Unlock()
78         return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
79 }
80
81 func (q *Queue) Subscribe() <-chan struct{} {
82         q.mtx.Lock()
83         defer q.mtx.Unlock()
84         if q.subscribers == nil {
85                 q.subscribers = map[<-chan struct{}]chan struct{}{}
86         }
87         ch := make(chan struct{}, 1)
88         q.subscribers[ch] = ch
89         return ch
90 }
91
92 func (q *Queue) Unsubscribe(ch <-chan struct{}) {
93         q.mtx.Lock()
94         defer q.mtx.Unlock()
95         delete(q.subscribers, ch)
96 }
97
98 // caller must have lock.
99 func (q *Queue) notify() {
100         for _, ch := range q.subscribers {
101                 select {
102                 case ch <- struct{}{}:
103                 default:
104                 }
105         }
106 }
107
108 // caller must have lock.
109 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
110         ent := q.entries[uuid]
111         if ent.Container.State != from {
112                 return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
113         }
114         ent.Container.State = to
115         q.entries[uuid] = ent
116         for i, ctr := range q.Containers {
117                 if ctr.UUID == uuid {
118                         q.Containers[i].State = to
119                         break
120                 }
121         }
122         q.notify()
123         return nil
124 }
125
126 // Update rebuilds the current entries from the Containers slice.
127 func (q *Queue) Update() error {
128         q.mtx.Lock()
129         defer q.mtx.Unlock()
130         updTime := time.Now()
131         upd := map[string]container.QueueEnt{}
132         for _, ctr := range q.Containers {
133                 _, exists := q.entries[ctr.UUID]
134                 if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
135                         continue
136                 }
137                 if ent, ok := upd[ctr.UUID]; ok {
138                         ent.Container = ctr
139                         upd[ctr.UUID] = ent
140                 } else {
141                         it, _ := q.ChooseType(&ctr)
142                         upd[ctr.UUID] = container.QueueEnt{
143                                 Container:    ctr,
144                                 InstanceType: it,
145                         }
146                 }
147         }
148         q.entries = upd
149         q.updTime = updTime
150         q.notify()
151         return nil
152 }
153
154 // Notify adds/updates an entry in the Containers slice.  This
155 // simulates the effect of an API update from someone other than the
156 // dispatcher -- e.g., crunch-run updating state to "Complete" when a
157 // container exits.
158 //
159 // The resulting changes are not exposed through Get() or Entries()
160 // until the next call to Update().
161 func (q *Queue) Notify(upd arvados.Container) {
162         q.mtx.Lock()
163         defer q.mtx.Unlock()
164         for i, ctr := range q.Containers {
165                 if ctr.UUID == upd.UUID {
166                         q.Containers[i] = upd
167                         return
168                 }
169         }
170         q.Containers = append(q.Containers, upd)
171 }