Merge branch '17995-filter-by-comparing-attrs'
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16
17         "github.com/prometheus/client_golang/prometheus/testutil"
18
19         check "gopkg.in/check.v1"
20 )
21
22 var (
23         // arbitrary example container UUIDs
24         uuids = func() (r []string) {
25                 for i := 0; i < 16; i++ {
26                         r = append(r, test.ContainerUUID(i))
27                 }
28                 return
29         }()
30 )
31
32 type stubQuotaError struct {
33         error
34 }
35
36 func (stubQuotaError) IsQuotaError() bool { return true }
37
38 type stubPool struct {
39         notify    <-chan struct{}
40         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
41         idle      map[arvados.InstanceType]int
42         unknown   map[arvados.InstanceType]int
43         running   map[string]time.Time
44         quota     int
45         canCreate int
46         creates   []arvados.InstanceType
47         starts    []string
48         shutdowns int
49         sync.Mutex
50 }
51
52 func (p *stubPool) AtQuota() bool {
53         p.Lock()
54         defer p.Unlock()
55         return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
56 }
57 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
58 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
59 func (p *stubPool) Running() map[string]time.Time {
60         p.Lock()
61         defer p.Unlock()
62         r := map[string]time.Time{}
63         for k, v := range p.running {
64                 r[k] = v
65         }
66         return r
67 }
68 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
69         p.Lock()
70         defer p.Unlock()
71         r := map[arvados.InstanceType]int{}
72         for it, n := range p.unalloc {
73                 r[it] = n - p.unknown[it]
74         }
75         return r
76 }
77 func (p *stubPool) Create(it arvados.InstanceType) bool {
78         p.Lock()
79         defer p.Unlock()
80         p.creates = append(p.creates, it)
81         if p.canCreate < 1 {
82                 return false
83         }
84         p.canCreate--
85         p.unalloc[it]++
86         return true
87 }
88 func (p *stubPool) ForgetContainer(uuid string) {
89 }
90 func (p *stubPool) KillContainer(uuid, reason string) bool {
91         p.Lock()
92         defer p.Unlock()
93         defer delete(p.running, uuid)
94         t, ok := p.running[uuid]
95         return ok && t.IsZero()
96 }
97 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
98         p.shutdowns++
99         return false
100 }
101 func (p *stubPool) CountWorkers() map[worker.State]int {
102         p.Lock()
103         defer p.Unlock()
104         return map[worker.State]int{
105                 worker.StateBooting: len(p.unalloc) - len(p.idle),
106                 worker.StateIdle:    len(p.idle),
107                 worker.StateRunning: len(p.running),
108                 worker.StateUnknown: len(p.unknown),
109         }
110 }
111 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
112         p.Lock()
113         defer p.Unlock()
114         p.starts = append(p.starts, ctr.UUID)
115         if p.idle[it] == 0 {
116                 return false
117         }
118         p.idle[it]--
119         p.unalloc[it]--
120         p.running[ctr.UUID] = time.Time{}
121         return true
122 }
123
124 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
125         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
126 }
127
128 var _ = check.Suite(&SchedulerSuite{})
129
130 type SchedulerSuite struct{}
131
132 // Assign priority=4 container to idle node. Create new instances for
133 // the priority=3, 2, 1 containers.
134 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
135         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
136         queue := test.Queue{
137                 ChooseType: chooseType,
138                 Containers: []arvados.Container{
139                         {
140                                 UUID:     test.ContainerUUID(1),
141                                 Priority: 1,
142                                 State:    arvados.ContainerStateLocked,
143                                 RuntimeConstraints: arvados.RuntimeConstraints{
144                                         VCPUs: 1,
145                                         RAM:   1 << 30,
146                                 },
147                         },
148                         {
149                                 UUID:     test.ContainerUUID(2),
150                                 Priority: 2,
151                                 State:    arvados.ContainerStateLocked,
152                                 RuntimeConstraints: arvados.RuntimeConstraints{
153                                         VCPUs: 1,
154                                         RAM:   1 << 30,
155                                 },
156                         },
157                         {
158                                 UUID:     test.ContainerUUID(3),
159                                 Priority: 3,
160                                 State:    arvados.ContainerStateLocked,
161                                 RuntimeConstraints: arvados.RuntimeConstraints{
162                                         VCPUs: 1,
163                                         RAM:   1 << 30,
164                                 },
165                         },
166                         {
167                                 UUID:     test.ContainerUUID(4),
168                                 Priority: 4,
169                                 State:    arvados.ContainerStateLocked,
170                                 RuntimeConstraints: arvados.RuntimeConstraints{
171                                         VCPUs: 1,
172                                         RAM:   1 << 30,
173                                 },
174                         },
175                 },
176         }
177         queue.Update()
178         pool := stubPool{
179                 quota: 1000,
180                 unalloc: map[arvados.InstanceType]int{
181                         test.InstanceType(1): 1,
182                         test.InstanceType(2): 2,
183                 },
184                 idle: map[arvados.InstanceType]int{
185                         test.InstanceType(1): 1,
186                         test.InstanceType(2): 2,
187                 },
188                 running:   map[string]time.Time{},
189                 canCreate: 0,
190         }
191         New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
192         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
193         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
194         c.Check(pool.running, check.HasLen, 1)
195         for uuid := range pool.running {
196                 c.Check(uuid, check.Equals, uuids[4])
197         }
198 }
199
200 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
201 // call Create().
202 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
203         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
204         for quota := 1; quota < 3; quota++ {
205                 c.Logf("quota=%d", quota)
206                 shouldCreate := []arvados.InstanceType{}
207                 for i := 1; i < quota; i++ {
208                         shouldCreate = append(shouldCreate, test.InstanceType(3))
209                 }
210                 queue := test.Queue{
211                         ChooseType: chooseType,
212                         Containers: []arvados.Container{
213                                 {
214                                         UUID:     test.ContainerUUID(2),
215                                         Priority: 2,
216                                         State:    arvados.ContainerStateLocked,
217                                         RuntimeConstraints: arvados.RuntimeConstraints{
218                                                 VCPUs: 2,
219                                                 RAM:   2 << 30,
220                                         },
221                                 },
222                                 {
223                                         UUID:     test.ContainerUUID(3),
224                                         Priority: 3,
225                                         State:    arvados.ContainerStateLocked,
226                                         RuntimeConstraints: arvados.RuntimeConstraints{
227                                                 VCPUs: 3,
228                                                 RAM:   3 << 30,
229                                         },
230                                 },
231                         },
232                 }
233                 queue.Update()
234                 pool := stubPool{
235                         quota: quota,
236                         unalloc: map[arvados.InstanceType]int{
237                                 test.InstanceType(2): 2,
238                         },
239                         idle: map[arvados.InstanceType]int{
240                                 test.InstanceType(2): 2,
241                         },
242                         running:   map[string]time.Time{},
243                         creates:   []arvados.InstanceType{},
244                         starts:    []string{},
245                         canCreate: 0,
246                 }
247                 sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
248                 sch.runQueue()
249                 sch.sync()
250                 sch.runQueue()
251                 sch.sync()
252                 c.Check(pool.creates, check.DeepEquals, shouldCreate)
253                 if len(shouldCreate) == 0 {
254                         c.Check(pool.starts, check.DeepEquals, []string{})
255                 } else {
256                         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
257                 }
258                 c.Check(pool.shutdowns, check.Equals, 3-quota)
259                 c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
260                         {UUID: "zzzzz-dz642-000000000000003", From: "Locked", To: "Queued"},
261                         {UUID: "zzzzz-dz642-000000000000002", From: "Locked", To: "Queued"},
262                 })
263         }
264 }
265
266 // Don't flap lock/unlock when equal-priority containers compete for
267 // limited workers.
268 //
269 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
270 // tends to choose a different one of the equal-priority containers as
271 // the "first" one that should be locked, and unlock the one it chose
272 // last time. This generates logging noise, and fails containers by
273 // reaching MaxDispatchAttempts quickly.)
274 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
275         logger := ctxlog.TestLogger(c)
276         ctx := ctxlog.Context(context.Background(), logger)
277         queue := test.Queue{
278                 ChooseType: chooseType,
279                 Logger:     logger,
280         }
281         for i := 0; i < 8; i++ {
282                 queue.Containers = append(queue.Containers, arvados.Container{
283                         UUID:     test.ContainerUUID(i),
284                         Priority: 333,
285                         State:    arvados.ContainerStateQueued,
286                         RuntimeConstraints: arvados.RuntimeConstraints{
287                                 VCPUs: 3,
288                                 RAM:   3 << 30,
289                         },
290                 })
291         }
292         queue.Update()
293         pool := stubPool{
294                 quota: 2,
295                 unalloc: map[arvados.InstanceType]int{
296                         test.InstanceType(3): 1,
297                 },
298                 idle: map[arvados.InstanceType]int{
299                         test.InstanceType(3): 1,
300                 },
301                 running:   map[string]time.Time{},
302                 creates:   []arvados.InstanceType{},
303                 starts:    []string{},
304                 canCreate: 1,
305         }
306         sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
307         for i := 0; i < 30; i++ {
308                 sch.runQueue()
309                 sch.sync()
310                 time.Sleep(time.Millisecond)
311         }
312         c.Check(pool.shutdowns, check.Equals, 0)
313         c.Check(pool.starts, check.HasLen, 1)
314         unlocked := map[string]int{}
315         for _, chg := range queue.StateChanges() {
316                 if chg.To == arvados.ContainerStateQueued {
317                         unlocked[chg.UUID]++
318                 }
319         }
320         for uuid, count := range unlocked {
321                 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
322         }
323 }
324
325 // Start lower-priority containers while waiting for new/existing
326 // workers to come up for higher-priority containers.
327 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
328         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
329         pool := stubPool{
330                 quota: 1000,
331                 unalloc: map[arvados.InstanceType]int{
332                         test.InstanceType(1): 2,
333                         test.InstanceType(2): 2,
334                 },
335                 idle: map[arvados.InstanceType]int{
336                         test.InstanceType(1): 1,
337                         test.InstanceType(2): 1,
338                 },
339                 running:   map[string]time.Time{},
340                 canCreate: 4,
341         }
342         queue := test.Queue{
343                 ChooseType: chooseType,
344                 Containers: []arvados.Container{
345                         {
346                                 // create a new worker
347                                 UUID:     test.ContainerUUID(1),
348                                 Priority: 1,
349                                 State:    arvados.ContainerStateLocked,
350                                 RuntimeConstraints: arvados.RuntimeConstraints{
351                                         VCPUs: 1,
352                                         RAM:   1 << 30,
353                                 },
354                         },
355                         {
356                                 // tentatively map to unalloc worker
357                                 UUID:     test.ContainerUUID(2),
358                                 Priority: 2,
359                                 State:    arvados.ContainerStateLocked,
360                                 RuntimeConstraints: arvados.RuntimeConstraints{
361                                         VCPUs: 1,
362                                         RAM:   1 << 30,
363                                 },
364                         },
365                         {
366                                 // start now on idle worker
367                                 UUID:     test.ContainerUUID(3),
368                                 Priority: 3,
369                                 State:    arvados.ContainerStateLocked,
370                                 RuntimeConstraints: arvados.RuntimeConstraints{
371                                         VCPUs: 1,
372                                         RAM:   1 << 30,
373                                 },
374                         },
375                         {
376                                 // create a new worker
377                                 UUID:     test.ContainerUUID(4),
378                                 Priority: 4,
379                                 State:    arvados.ContainerStateLocked,
380                                 RuntimeConstraints: arvados.RuntimeConstraints{
381                                         VCPUs: 2,
382                                         RAM:   2 << 30,
383                                 },
384                         },
385                         {
386                                 // tentatively map to unalloc worker
387                                 UUID:     test.ContainerUUID(5),
388                                 Priority: 5,
389                                 State:    arvados.ContainerStateLocked,
390                                 RuntimeConstraints: arvados.RuntimeConstraints{
391                                         VCPUs: 2,
392                                         RAM:   2 << 30,
393                                 },
394                         },
395                         {
396                                 // start now on idle worker
397                                 UUID:     test.ContainerUUID(6),
398                                 Priority: 6,
399                                 State:    arvados.ContainerStateLocked,
400                                 RuntimeConstraints: arvados.RuntimeConstraints{
401                                         VCPUs: 2,
402                                         RAM:   2 << 30,
403                                 },
404                         },
405                 },
406         }
407         queue.Update()
408         New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
409         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
410         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
411         running := map[string]bool{}
412         for uuid, t := range pool.running {
413                 if t.IsZero() {
414                         running[uuid] = false
415                 } else {
416                         running[uuid] = true
417                 }
418         }
419         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
420 }
421
422 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
423         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
424         pool := stubPool{
425                 quota: 1000,
426                 unalloc: map[arvados.InstanceType]int{
427                         test.InstanceType(2): 0,
428                 },
429                 idle: map[arvados.InstanceType]int{
430                         test.InstanceType(2): 0,
431                 },
432                 running: map[string]time.Time{
433                         test.ContainerUUID(2): {},
434                 },
435         }
436         queue := test.Queue{
437                 ChooseType: chooseType,
438                 Containers: []arvados.Container{
439                         {
440                                 // create a new worker
441                                 UUID:     test.ContainerUUID(1),
442                                 Priority: 1,
443                                 State:    arvados.ContainerStateLocked,
444                                 RuntimeConstraints: arvados.RuntimeConstraints{
445                                         VCPUs: 1,
446                                         RAM:   1 << 30,
447                                 },
448                         },
449                 },
450         }
451         queue.Update()
452         sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
453         c.Check(pool.running, check.HasLen, 1)
454         sch.sync()
455         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
456         }
457         c.Check(pool.Running(), check.HasLen, 0)
458 }
459
460 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
461         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
462         queue := test.Queue{
463                 ChooseType: chooseType,
464                 Containers: []arvados.Container{
465                         {
466                                 UUID:      test.ContainerUUID(1),
467                                 Priority:  1,
468                                 State:     arvados.ContainerStateLocked,
469                                 CreatedAt: time.Now().Add(-10 * time.Second),
470                                 RuntimeConstraints: arvados.RuntimeConstraints{
471                                         VCPUs: 1,
472                                         RAM:   1 << 30,
473                                 },
474                         },
475                 },
476         }
477         queue.Update()
478
479         // Create a pool with one unallocated (idle/booting/unknown) worker,
480         // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
481         // state, and the container will be allocated but not started yet.
482         pool := stubPool{
483                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
484         }
485         sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
486         sch.runQueue()
487         sch.updateMetrics()
488
489         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
490         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
491         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
492
493         // Create a pool without workers. The queued container will not be started, and the
494         // 'over quota' metric will be 1 because no workers are available and canCreate defaults
495         // to zero.
496         pool = stubPool{}
497         sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
498         sch.runQueue()
499         sch.updateMetrics()
500
501         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
502         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
503         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
504
505         // Reset the queue, and create a pool with an idle worker. The queued
506         // container will be started immediately and mLongestWaitTimeSinceQueue
507         // should be zero.
508         queue = test.Queue{
509                 ChooseType: chooseType,
510                 Containers: []arvados.Container{
511                         {
512                                 UUID:      test.ContainerUUID(1),
513                                 Priority:  1,
514                                 State:     arvados.ContainerStateLocked,
515                                 CreatedAt: time.Now().Add(-10 * time.Second),
516                                 RuntimeConstraints: arvados.RuntimeConstraints{
517                                         VCPUs: 1,
518                                         RAM:   1 << 30,
519                                 },
520                         },
521                 },
522         }
523         queue.Update()
524
525         pool = stubPool{
526                 idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
527                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
528                 running: map[string]time.Time{},
529         }
530         sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
531         sch.runQueue()
532         sch.updateMetrics()
533
534         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
535 }