17995: Fix merge error.
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16
17         "github.com/prometheus/client_golang/prometheus/testutil"
18
19         check "gopkg.in/check.v1"
20 )
21
22 var (
23         // arbitrary example container UUIDs
24         uuids = func() (r []string) {
25                 for i := 0; i < 16; i++ {
26                         r = append(r, test.ContainerUUID(i))
27                 }
28                 return
29         }()
30 )
31
32 type stubQuotaError struct {
33         error
34 }
35
36 func (stubQuotaError) IsQuotaError() bool { return true }
37
38 type stubPool struct {
39         notify    <-chan struct{}
40         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
41         idle      map[arvados.InstanceType]int
42         unknown   map[arvados.InstanceType]int
43         running   map[string]time.Time
44         quota     int
45         canCreate int
46         creates   []arvados.InstanceType
47         starts    []string
48         shutdowns int
49         sync.Mutex
50 }
51
52 func (p *stubPool) AtQuota() bool {
53         p.Lock()
54         defer p.Unlock()
55         return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
56 }
57 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
58 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
59 func (p *stubPool) Running() map[string]time.Time {
60         p.Lock()
61         defer p.Unlock()
62         r := map[string]time.Time{}
63         for k, v := range p.running {
64                 r[k] = v
65         }
66         return r
67 }
68 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
69         p.Lock()
70         defer p.Unlock()
71         r := map[arvados.InstanceType]int{}
72         for it, n := range p.unalloc {
73                 r[it] = n - p.unknown[it]
74         }
75         return r
76 }
77 func (p *stubPool) Create(it arvados.InstanceType) bool {
78         p.Lock()
79         defer p.Unlock()
80         p.creates = append(p.creates, it)
81         if p.canCreate < 1 {
82                 return false
83         }
84         p.canCreate--
85         p.unalloc[it]++
86         return true
87 }
88 func (p *stubPool) ForgetContainer(uuid string) {
89 }
90 func (p *stubPool) KillContainer(uuid, reason string) bool {
91         p.Lock()
92         defer p.Unlock()
93         defer delete(p.running, uuid)
94         t, ok := p.running[uuid]
95         return ok && t.IsZero()
96 }
97 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
98         p.shutdowns++
99         return false
100 }
101 func (p *stubPool) CountWorkers() map[worker.State]int {
102         p.Lock()
103         defer p.Unlock()
104         return map[worker.State]int{
105                 worker.StateBooting: len(p.unalloc) - len(p.idle),
106                 worker.StateIdle:    len(p.idle),
107                 worker.StateRunning: len(p.running),
108                 worker.StateUnknown: len(p.unknown),
109         }
110 }
111 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
112         p.Lock()
113         defer p.Unlock()
114         p.starts = append(p.starts, ctr.UUID)
115         if p.idle[it] == 0 {
116                 return false
117         }
118         p.idle[it]--
119         p.unalloc[it]--
120         p.running[ctr.UUID] = time.Time{}
121         return true
122 }
123
124 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
125         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
126 }
127
128 var _ = check.Suite(&SchedulerSuite{})
129
130 type SchedulerSuite struct{}
131
132 // Assign priority=4 container to idle node. Create new instances for
133 // the priority=3, 2, 1 containers.
134 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
135         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
136         queue := test.Queue{
137                 ChooseType: chooseType,
138                 Containers: []arvados.Container{
139                         {
140                                 UUID:     test.ContainerUUID(1),
141                                 Priority: 1,
142                                 State:    arvados.ContainerStateLocked,
143                                 RuntimeConstraints: arvados.RuntimeConstraints{
144                                         VCPUs: 1,
145                                         RAM:   1 << 30,
146                                 },
147                         },
148                         {
149                                 UUID:     test.ContainerUUID(2),
150                                 Priority: 2,
151                                 State:    arvados.ContainerStateLocked,
152                                 RuntimeConstraints: arvados.RuntimeConstraints{
153                                         VCPUs: 1,
154                                         RAM:   1 << 30,
155                                 },
156                         },
157                         {
158                                 UUID:     test.ContainerUUID(3),
159                                 Priority: 3,
160                                 State:    arvados.ContainerStateLocked,
161                                 RuntimeConstraints: arvados.RuntimeConstraints{
162                                         VCPUs: 1,
163                                         RAM:   1 << 30,
164                                 },
165                         },
166                         {
167                                 UUID:     test.ContainerUUID(4),
168                                 Priority: 4,
169                                 State:    arvados.ContainerStateLocked,
170                                 RuntimeConstraints: arvados.RuntimeConstraints{
171                                         VCPUs: 1,
172                                         RAM:   1 << 30,
173                                 },
174                         },
175                 },
176         }
177         queue.Update()
178         pool := stubPool{
179                 quota: 1000,
180                 unalloc: map[arvados.InstanceType]int{
181                         test.InstanceType(1): 1,
182                         test.InstanceType(2): 2,
183                 },
184                 idle: map[arvados.InstanceType]int{
185                         test.InstanceType(1): 1,
186                         test.InstanceType(2): 2,
187                 },
188                 running:   map[string]time.Time{},
189                 canCreate: 0,
190         }
191         New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
192         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
193         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
194         c.Check(pool.running, check.HasLen, 1)
195         for uuid := range pool.running {
196                 c.Check(uuid, check.Equals, uuids[4])
197         }
198 }
199
200 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
201 // call Create().
202 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
203         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
204         for quota := 1; quota < 3; quota++ {
205                 c.Logf("quota=%d", quota)
206                 shouldCreate := []arvados.InstanceType{}
207                 for i := 1; i < quota; i++ {
208                         shouldCreate = append(shouldCreate, test.InstanceType(3))
209                 }
210                 queue := test.Queue{
211                         ChooseType: chooseType,
212                         Containers: []arvados.Container{
213                                 {
214                                         UUID:     test.ContainerUUID(2),
215                                         Priority: 2,
216                                         State:    arvados.ContainerStateLocked,
217                                         RuntimeConstraints: arvados.RuntimeConstraints{
218                                                 VCPUs: 2,
219                                                 RAM:   2 << 30,
220                                         },
221                                 },
222                                 {
223                                         UUID:     test.ContainerUUID(3),
224                                         Priority: 3,
225                                         State:    arvados.ContainerStateLocked,
226                                         RuntimeConstraints: arvados.RuntimeConstraints{
227                                                 VCPUs: 3,
228                                                 RAM:   3 << 30,
229                                         },
230                                 },
231                         },
232                 }
233                 queue.Update()
234                 pool := stubPool{
235                         quota: quota,
236                         unalloc: map[arvados.InstanceType]int{
237                                 test.InstanceType(2): 2,
238                         },
239                         idle: map[arvados.InstanceType]int{
240                                 test.InstanceType(2): 2,
241                         },
242                         running:   map[string]time.Time{},
243                         creates:   []arvados.InstanceType{},
244                         starts:    []string{},
245                         canCreate: 0,
246                 }
247                 New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
248                 c.Check(pool.creates, check.DeepEquals, shouldCreate)
249                 if len(shouldCreate) == 0 {
250                         c.Check(pool.starts, check.DeepEquals, []string{})
251                         c.Check(pool.shutdowns, check.Not(check.Equals), 0)
252                 } else {
253                         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
254                         c.Check(pool.shutdowns, check.Equals, 0)
255                 }
256         }
257 }
258
259 // Start lower-priority containers while waiting for new/existing
260 // workers to come up for higher-priority containers.
261 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
262         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
263         pool := stubPool{
264                 quota: 1000,
265                 unalloc: map[arvados.InstanceType]int{
266                         test.InstanceType(1): 2,
267                         test.InstanceType(2): 2,
268                 },
269                 idle: map[arvados.InstanceType]int{
270                         test.InstanceType(1): 1,
271                         test.InstanceType(2): 1,
272                 },
273                 running:   map[string]time.Time{},
274                 canCreate: 4,
275         }
276         queue := test.Queue{
277                 ChooseType: chooseType,
278                 Containers: []arvados.Container{
279                         {
280                                 // create a new worker
281                                 UUID:     test.ContainerUUID(1),
282                                 Priority: 1,
283                                 State:    arvados.ContainerStateLocked,
284                                 RuntimeConstraints: arvados.RuntimeConstraints{
285                                         VCPUs: 1,
286                                         RAM:   1 << 30,
287                                 },
288                         },
289                         {
290                                 // tentatively map to unalloc worker
291                                 UUID:     test.ContainerUUID(2),
292                                 Priority: 2,
293                                 State:    arvados.ContainerStateLocked,
294                                 RuntimeConstraints: arvados.RuntimeConstraints{
295                                         VCPUs: 1,
296                                         RAM:   1 << 30,
297                                 },
298                         },
299                         {
300                                 // start now on idle worker
301                                 UUID:     test.ContainerUUID(3),
302                                 Priority: 3,
303                                 State:    arvados.ContainerStateLocked,
304                                 RuntimeConstraints: arvados.RuntimeConstraints{
305                                         VCPUs: 1,
306                                         RAM:   1 << 30,
307                                 },
308                         },
309                         {
310                                 // create a new worker
311                                 UUID:     test.ContainerUUID(4),
312                                 Priority: 4,
313                                 State:    arvados.ContainerStateLocked,
314                                 RuntimeConstraints: arvados.RuntimeConstraints{
315                                         VCPUs: 2,
316                                         RAM:   2 << 30,
317                                 },
318                         },
319                         {
320                                 // tentatively map to unalloc worker
321                                 UUID:     test.ContainerUUID(5),
322                                 Priority: 5,
323                                 State:    arvados.ContainerStateLocked,
324                                 RuntimeConstraints: arvados.RuntimeConstraints{
325                                         VCPUs: 2,
326                                         RAM:   2 << 30,
327                                 },
328                         },
329                         {
330                                 // start now on idle worker
331                                 UUID:     test.ContainerUUID(6),
332                                 Priority: 6,
333                                 State:    arvados.ContainerStateLocked,
334                                 RuntimeConstraints: arvados.RuntimeConstraints{
335                                         VCPUs: 2,
336                                         RAM:   2 << 30,
337                                 },
338                         },
339                 },
340         }
341         queue.Update()
342         New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
343         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
344         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
345         running := map[string]bool{}
346         for uuid, t := range pool.running {
347                 if t.IsZero() {
348                         running[uuid] = false
349                 } else {
350                         running[uuid] = true
351                 }
352         }
353         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
354 }
355
356 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
357         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
358         pool := stubPool{
359                 quota: 1000,
360                 unalloc: map[arvados.InstanceType]int{
361                         test.InstanceType(2): 0,
362                 },
363                 idle: map[arvados.InstanceType]int{
364                         test.InstanceType(2): 0,
365                 },
366                 running: map[string]time.Time{
367                         test.ContainerUUID(2): {},
368                 },
369         }
370         queue := test.Queue{
371                 ChooseType: chooseType,
372                 Containers: []arvados.Container{
373                         {
374                                 // create a new worker
375                                 UUID:     test.ContainerUUID(1),
376                                 Priority: 1,
377                                 State:    arvados.ContainerStateLocked,
378                                 RuntimeConstraints: arvados.RuntimeConstraints{
379                                         VCPUs: 1,
380                                         RAM:   1 << 30,
381                                 },
382                         },
383                 },
384         }
385         queue.Update()
386         sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
387         c.Check(pool.running, check.HasLen, 1)
388         sch.sync()
389         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
390         }
391         c.Check(pool.Running(), check.HasLen, 0)
392 }
393
394 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
395         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
396         queue := test.Queue{
397                 ChooseType: chooseType,
398                 Containers: []arvados.Container{
399                         {
400                                 UUID:      test.ContainerUUID(1),
401                                 Priority:  1,
402                                 State:     arvados.ContainerStateLocked,
403                                 CreatedAt: time.Now().Add(-10 * time.Second),
404                                 RuntimeConstraints: arvados.RuntimeConstraints{
405                                         VCPUs: 1,
406                                         RAM:   1 << 30,
407                                 },
408                         },
409                 },
410         }
411         queue.Update()
412
413         // Create a pool with one unallocated (idle/booting/unknown) worker,
414         // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
415         // state, and the container will be allocated but not started yet.
416         pool := stubPool{
417                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
418         }
419         sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
420         sch.runQueue()
421         sch.updateMetrics()
422
423         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
424         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
425         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
426
427         // Create a pool without workers. The queued container will not be started, and the
428         // 'over quota' metric will be 1 because no workers are available and canCreate defaults
429         // to zero.
430         pool = stubPool{}
431         sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
432         sch.runQueue()
433         sch.updateMetrics()
434
435         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
436         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
437         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
438
439         // Reset the queue, and create a pool with an idle worker. The queued
440         // container will be started immediately and mLongestWaitTimeSinceQueue
441         // should be zero.
442         queue = test.Queue{
443                 ChooseType: chooseType,
444                 Containers: []arvados.Container{
445                         {
446                                 UUID:      test.ContainerUUID(1),
447                                 Priority:  1,
448                                 State:     arvados.ContainerStateLocked,
449                                 CreatedAt: time.Now().Add(-10 * time.Second),
450                                 RuntimeConstraints: arvados.RuntimeConstraints{
451                                         VCPUs: 1,
452                                         RAM:   1 << 30,
453                                 },
454                         },
455                 },
456         }
457         queue.Update()
458
459         pool = stubPool{
460                 idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
461                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
462                 running: map[string]time.Time{},
463         }
464         sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
465         sch.runQueue()
466         sch.updateMetrics()
467
468         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
469 }