20457: Exercise quota handling in dispatcher chaos test.
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16
17         "github.com/prometheus/client_golang/prometheus/testutil"
18
19         check "gopkg.in/check.v1"
20 )
21
22 var (
23         // arbitrary example container UUIDs
24         uuids = func() (r []string) {
25                 for i := 0; i < 16; i++ {
26                         r = append(r, test.ContainerUUID(i))
27                 }
28                 return
29         }()
30 )
31
32 type stubPool struct {
33         notify    <-chan struct{}
34         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
35         idle      map[arvados.InstanceType]int
36         unknown   map[arvados.InstanceType]int
37         running   map[string]time.Time
38         quota     int
39         canCreate int
40         creates   []arvados.InstanceType
41         starts    []string
42         shutdowns int
43         sync.Mutex
44 }
45
46 func (p *stubPool) AtQuota() bool {
47         p.Lock()
48         defer p.Unlock()
49         n := len(p.running)
50         for _, nn := range p.unalloc {
51                 n += nn
52         }
53         for _, nn := range p.unknown {
54                 n += nn
55         }
56         return n >= p.quota
57 }
58 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
59 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
60 func (p *stubPool) Running() map[string]time.Time {
61         p.Lock()
62         defer p.Unlock()
63         r := map[string]time.Time{}
64         for k, v := range p.running {
65                 r[k] = v
66         }
67         return r
68 }
69 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
70         p.Lock()
71         defer p.Unlock()
72         r := map[arvados.InstanceType]int{}
73         for it, n := range p.unalloc {
74                 r[it] = n - p.unknown[it]
75         }
76         return r
77 }
78 func (p *stubPool) Create(it arvados.InstanceType) bool {
79         p.Lock()
80         defer p.Unlock()
81         p.creates = append(p.creates, it)
82         if p.canCreate < 1 {
83                 return false
84         }
85         p.canCreate--
86         p.unalloc[it]++
87         return true
88 }
89 func (p *stubPool) ForgetContainer(uuid string) {
90 }
91 func (p *stubPool) KillContainer(uuid, reason string) bool {
92         p.Lock()
93         defer p.Unlock()
94         defer delete(p.running, uuid)
95         t, ok := p.running[uuid]
96         return ok && t.IsZero()
97 }
98 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
99         p.shutdowns++
100         return false
101 }
102 func (p *stubPool) CountWorkers() map[worker.State]int {
103         p.Lock()
104         defer p.Unlock()
105         return map[worker.State]int{
106                 worker.StateBooting: len(p.unalloc) - len(p.idle),
107                 worker.StateIdle:    len(p.idle),
108                 worker.StateRunning: len(p.running),
109                 worker.StateUnknown: len(p.unknown),
110         }
111 }
112 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
113         p.Lock()
114         defer p.Unlock()
115         p.starts = append(p.starts, ctr.UUID)
116         if p.idle[it] == 0 {
117                 return false
118         }
119         p.idle[it]--
120         p.unalloc[it]--
121         p.running[ctr.UUID] = time.Time{}
122         return true
123 }
124
125 func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
126         return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
127 }
128
129 var _ = check.Suite(&SchedulerSuite{})
130
131 type SchedulerSuite struct{}
132
133 // Assign priority=4 container to idle node. Create new instances for
134 // the priority=3, 2, 1 containers.
135 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
136         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
137         queue := test.Queue{
138                 ChooseType: chooseType,
139                 Containers: []arvados.Container{
140                         {
141                                 UUID:     test.ContainerUUID(1),
142                                 Priority: 1,
143                                 State:    arvados.ContainerStateLocked,
144                                 RuntimeConstraints: arvados.RuntimeConstraints{
145                                         VCPUs: 1,
146                                         RAM:   1 << 30,
147                                 },
148                         },
149                         {
150                                 UUID:     test.ContainerUUID(2),
151                                 Priority: 2,
152                                 State:    arvados.ContainerStateLocked,
153                                 RuntimeConstraints: arvados.RuntimeConstraints{
154                                         VCPUs: 1,
155                                         RAM:   1 << 30,
156                                 },
157                         },
158                         {
159                                 UUID:     test.ContainerUUID(3),
160                                 Priority: 3,
161                                 State:    arvados.ContainerStateLocked,
162                                 RuntimeConstraints: arvados.RuntimeConstraints{
163                                         VCPUs: 1,
164                                         RAM:   1 << 30,
165                                 },
166                         },
167                         {
168                                 UUID:     test.ContainerUUID(4),
169                                 Priority: 4,
170                                 State:    arvados.ContainerStateLocked,
171                                 RuntimeConstraints: arvados.RuntimeConstraints{
172                                         VCPUs: 1,
173                                         RAM:   1 << 30,
174                                 },
175                         },
176                 },
177         }
178         queue.Update()
179         pool := stubPool{
180                 quota: 1000,
181                 unalloc: map[arvados.InstanceType]int{
182                         test.InstanceType(1): 1,
183                         test.InstanceType(2): 2,
184                 },
185                 idle: map[arvados.InstanceType]int{
186                         test.InstanceType(1): 1,
187                         test.InstanceType(2): 2,
188                 },
189                 running:   map[string]time.Time{},
190                 canCreate: 0,
191         }
192         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0).runQueue()
193         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
194         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
195         c.Check(pool.running, check.HasLen, 1)
196         for uuid := range pool.running {
197                 c.Check(uuid, check.Equals, uuids[4])
198         }
199 }
200
201 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
202 // call Create().
203 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
204         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
205         for quota := 1; quota <= 3; quota++ {
206                 c.Logf("quota=%d", quota)
207                 queue := test.Queue{
208                         ChooseType: chooseType,
209                         Containers: []arvados.Container{
210                                 {
211                                         UUID:     test.ContainerUUID(2),
212                                         Priority: 2,
213                                         State:    arvados.ContainerStateLocked,
214                                         RuntimeConstraints: arvados.RuntimeConstraints{
215                                                 VCPUs: 2,
216                                                 RAM:   2 << 30,
217                                         },
218                                 },
219                                 {
220                                         UUID:     test.ContainerUUID(3),
221                                         Priority: 3,
222                                         State:    arvados.ContainerStateLocked,
223                                         RuntimeConstraints: arvados.RuntimeConstraints{
224                                                 VCPUs: 3,
225                                                 RAM:   3 << 30,
226                                         },
227                                 },
228                         },
229                 }
230                 queue.Update()
231                 pool := stubPool{
232                         quota: quota,
233                         unalloc: map[arvados.InstanceType]int{
234                                 test.InstanceType(2): 2,
235                         },
236                         idle: map[arvados.InstanceType]int{
237                                 test.InstanceType(2): 2,
238                         },
239                         running:   map[string]time.Time{},
240                         creates:   []arvados.InstanceType{},
241                         starts:    []string{},
242                         canCreate: 0,
243                 }
244                 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
245                 sch.sync()
246                 sch.runQueue()
247                 sch.sync()
248                 switch quota {
249                 case 1, 2:
250                         // Can't create a type3 node for ctr3, so we
251                         // shutdown an unallocated node (type2), and
252                         // unlock both containers.
253                         c.Check(pool.starts, check.HasLen, 0)
254                         c.Check(pool.shutdowns, check.Equals, 1)
255                         c.Check(pool.creates, check.HasLen, 0)
256                         c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
257                                 {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
258                                 {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
259                         })
260                 case 3:
261                         // Creating a type3 instance works, so we
262                         // start ctr2 on a type2 instance, and leave
263                         // ctr3 locked while we wait for the new
264                         // instance to come up.
265                         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
266                         c.Check(pool.shutdowns, check.Equals, 0)
267                         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
268                         c.Check(queue.StateChanges(), check.HasLen, 0)
269                 default:
270                         panic("test not written for quota>3")
271                 }
272         }
273 }
274
275 // Don't unlock containers or shutdown unalloc (booting/idle) nodes
276 // just because some 503 errors caused us to reduce maxConcurrency
277 // below the current load level.
278 //
279 // We expect to raise maxConcurrency soon when we stop seeing 503s. If
280 // that doesn't happen soon, the idle timeout will take care of the
281 // excess nodes.
282 func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
283         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
284         queue := test.Queue{
285                 ChooseType: chooseType,
286                 Containers: []arvados.Container{
287                         // scheduled on an instance (but not Running yet)
288                         {
289                                 UUID:     test.ContainerUUID(1),
290                                 Priority: 1000,
291                                 State:    arvados.ContainerStateLocked,
292                                 RuntimeConstraints: arvados.RuntimeConstraints{
293                                         VCPUs: 2,
294                                         RAM:   2 << 30,
295                                 },
296                         },
297                         // not yet scheduled
298                         {
299                                 UUID:     test.ContainerUUID(2),
300                                 Priority: 1000,
301                                 State:    arvados.ContainerStateLocked,
302                                 RuntimeConstraints: arvados.RuntimeConstraints{
303                                         VCPUs: 2,
304                                         RAM:   2 << 30,
305                                 },
306                         },
307                         // scheduled on an instance (but not Running yet)
308                         {
309                                 UUID:     test.ContainerUUID(3),
310                                 Priority: 1000,
311                                 State:    arvados.ContainerStateLocked,
312                                 RuntimeConstraints: arvados.RuntimeConstraints{
313                                         VCPUs: 3,
314                                         RAM:   3 << 30,
315                                 },
316                         },
317                         // not yet scheduled
318                         {
319                                 UUID:     test.ContainerUUID(4),
320                                 Priority: 1000,
321                                 State:    arvados.ContainerStateLocked,
322                                 RuntimeConstraints: arvados.RuntimeConstraints{
323                                         VCPUs: 3,
324                                         RAM:   3 << 30,
325                                 },
326                         },
327                         // not yet locked
328                         {
329                                 UUID:     test.ContainerUUID(5),
330                                 Priority: 1000,
331                                 State:    arvados.ContainerStateQueued,
332                                 RuntimeConstraints: arvados.RuntimeConstraints{
333                                         VCPUs: 3,
334                                         RAM:   3 << 30,
335                                 },
336                         },
337                 },
338         }
339         queue.Update()
340         pool := stubPool{
341                 quota: 16,
342                 unalloc: map[arvados.InstanceType]int{
343                         test.InstanceType(2): 2,
344                         test.InstanceType(3): 2,
345                 },
346                 idle: map[arvados.InstanceType]int{
347                         test.InstanceType(2): 1,
348                         test.InstanceType(3): 1,
349                 },
350                 running: map[string]time.Time{
351                         test.ContainerUUID(1): {},
352                         test.ContainerUUID(3): {},
353                 },
354                 creates:   []arvados.InstanceType{},
355                 starts:    []string{},
356                 canCreate: 0,
357         }
358         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
359         sch.last503time = time.Now()
360         sch.maxConcurrency = 3
361         sch.sync()
362         sch.runQueue()
363         sch.sync()
364
365         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
366         c.Check(pool.shutdowns, check.Equals, 0)
367         c.Check(pool.creates, check.HasLen, 0)
368         c.Check(queue.StateChanges(), check.HasLen, 0)
369 }
370
371 // If we somehow have more supervisor containers in Locked state than
372 // we should (e.g., config changed since they started), and some
373 // appropriate-sized instances booting up, unlock the excess
374 // supervisor containers, but let the instances keep booting.
375 func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
376         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
377         queue := test.Queue{
378                 ChooseType: chooseType,
379         }
380         for i := 1; i <= 6; i++ {
381                 queue.Containers = append(queue.Containers, arvados.Container{
382                         UUID:     test.ContainerUUID(i),
383                         Priority: int64(1000 - i),
384                         State:    arvados.ContainerStateLocked,
385                         RuntimeConstraints: arvados.RuntimeConstraints{
386                                 VCPUs: 2,
387                                 RAM:   2 << 30,
388                         },
389                         SchedulingParameters: arvados.SchedulingParameters{
390                                 Supervisor: true,
391                         },
392                 })
393         }
394         queue.Update()
395         pool := stubPool{
396                 quota: 16,
397                 unalloc: map[arvados.InstanceType]int{
398                         test.InstanceType(2): 2,
399                 },
400                 idle: map[arvados.InstanceType]int{
401                         test.InstanceType(2): 1,
402                 },
403                 running: map[string]time.Time{
404                         test.ContainerUUID(1): {},
405                         test.ContainerUUID(2): {},
406                         test.ContainerUUID(3): {},
407                         test.ContainerUUID(4): {},
408                 },
409                 creates:   []arvados.InstanceType{},
410                 starts:    []string{},
411                 canCreate: 0,
412         }
413         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 8, 0.5)
414         sch.sync()
415         sch.runQueue()
416         sch.sync()
417
418         c.Check(pool.starts, check.DeepEquals, []string{})
419         c.Check(pool.shutdowns, check.Equals, 0)
420         c.Check(pool.creates, check.HasLen, 0)
421         c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
422                 {UUID: test.ContainerUUID(5), From: "Locked", To: "Queued"},
423                 {UUID: test.ContainerUUID(6), From: "Locked", To: "Queued"},
424         })
425 }
426
427 // Assuming we're not at quota, don't try to shutdown idle nodes
428 // merely because we have more queued/locked supervisor containers
429 // than MaxSupervisors -- it won't help.
430 func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
431         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
432         queue := test.Queue{
433                 ChooseType: chooseType,
434         }
435         for i := 1; i <= 8; i++ {
436                 queue.Containers = append(queue.Containers, arvados.Container{
437                         UUID:     test.ContainerUUID(i),
438                         Priority: int64(1000 + i),
439                         State:    arvados.ContainerStateQueued,
440                         RuntimeConstraints: arvados.RuntimeConstraints{
441                                 VCPUs: 2,
442                                 RAM:   2 << 30,
443                         },
444                         SchedulingParameters: arvados.SchedulingParameters{
445                                 Supervisor: true,
446                         },
447                 })
448         }
449         for i := 2; i < 4; i++ {
450                 queue.Containers[i].State = arvados.ContainerStateLocked
451         }
452         for i := 4; i < 6; i++ {
453                 queue.Containers[i].State = arvados.ContainerStateRunning
454         }
455         queue.Update()
456         pool := stubPool{
457                 quota: 16,
458                 unalloc: map[arvados.InstanceType]int{
459                         test.InstanceType(2): 2,
460                 },
461                 idle: map[arvados.InstanceType]int{
462                         test.InstanceType(2): 1,
463                 },
464                 running: map[string]time.Time{
465                         test.ContainerUUID(5): {},
466                         test.ContainerUUID(6): {},
467                 },
468                 creates:   []arvados.InstanceType{},
469                 starts:    []string{},
470                 canCreate: 0,
471         }
472         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 8, 0.5)
473         sch.sync()
474         sch.runQueue()
475         sch.sync()
476
477         c.Check(pool.starts, check.HasLen, 2)
478         c.Check(pool.shutdowns, check.Equals, 0)
479         c.Check(pool.creates, check.HasLen, 0)
480         c.Check(queue.StateChanges(), check.HasLen, 0)
481 }
482
483 // Don't flap lock/unlock when equal-priority containers compete for
484 // limited workers.
485 //
486 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
487 // tends to choose a different one of the equal-priority containers as
488 // the "first" one that should be locked, and unlock the one it chose
489 // last time. This generates logging noise, and fails containers by
490 // reaching MaxDispatchAttempts quickly.)
491 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
492         logger := ctxlog.TestLogger(c)
493         ctx := ctxlog.Context(context.Background(), logger)
494         queue := test.Queue{
495                 ChooseType: chooseType,
496                 Logger:     logger,
497         }
498         for i := 0; i < 8; i++ {
499                 queue.Containers = append(queue.Containers, arvados.Container{
500                         UUID:     test.ContainerUUID(i),
501                         Priority: 333,
502                         State:    arvados.ContainerStateQueued,
503                         RuntimeConstraints: arvados.RuntimeConstraints{
504                                 VCPUs: 3,
505                                 RAM:   3 << 30,
506                         },
507                 })
508         }
509         queue.Update()
510         pool := stubPool{
511                 quota: 2,
512                 unalloc: map[arvados.InstanceType]int{
513                         test.InstanceType(3): 2,
514                 },
515                 idle: map[arvados.InstanceType]int{
516                         test.InstanceType(3): 2,
517                 },
518                 running:   map[string]time.Time{},
519                 creates:   []arvados.InstanceType{},
520                 starts:    []string{},
521                 canCreate: 0,
522         }
523         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
524         for i := 0; i < 30; i++ {
525                 sch.runQueue()
526                 sch.sync()
527                 time.Sleep(time.Millisecond)
528         }
529         c.Check(pool.shutdowns, check.Equals, 0)
530         c.Check(pool.starts, check.HasLen, 2)
531         unlocked := map[string]int{}
532         for _, chg := range queue.StateChanges() {
533                 if chg.To == arvados.ContainerStateQueued {
534                         unlocked[chg.UUID]++
535                 }
536         }
537         for uuid, count := range unlocked {
538                 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
539         }
540 }
541
542 // Start lower-priority containers while waiting for new/existing
543 // workers to come up for higher-priority containers.
544 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
545         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
546         pool := stubPool{
547                 quota: 1000,
548                 unalloc: map[arvados.InstanceType]int{
549                         test.InstanceType(1): 2,
550                         test.InstanceType(2): 2,
551                 },
552                 idle: map[arvados.InstanceType]int{
553                         test.InstanceType(1): 1,
554                         test.InstanceType(2): 1,
555                 },
556                 running:   map[string]time.Time{},
557                 canCreate: 4,
558         }
559         queue := test.Queue{
560                 ChooseType: chooseType,
561                 Containers: []arvados.Container{
562                         {
563                                 // create a new worker
564                                 UUID:     test.ContainerUUID(1),
565                                 Priority: 1,
566                                 State:    arvados.ContainerStateLocked,
567                                 RuntimeConstraints: arvados.RuntimeConstraints{
568                                         VCPUs: 1,
569                                         RAM:   1 << 30,
570                                 },
571                         },
572                         {
573                                 // tentatively map to unalloc worker
574                                 UUID:     test.ContainerUUID(2),
575                                 Priority: 2,
576                                 State:    arvados.ContainerStateLocked,
577                                 RuntimeConstraints: arvados.RuntimeConstraints{
578                                         VCPUs: 1,
579                                         RAM:   1 << 30,
580                                 },
581                         },
582                         {
583                                 // start now on idle worker
584                                 UUID:     test.ContainerUUID(3),
585                                 Priority: 3,
586                                 State:    arvados.ContainerStateLocked,
587                                 RuntimeConstraints: arvados.RuntimeConstraints{
588                                         VCPUs: 1,
589                                         RAM:   1 << 30,
590                                 },
591                         },
592                         {
593                                 // create a new worker
594                                 UUID:     test.ContainerUUID(4),
595                                 Priority: 4,
596                                 State:    arvados.ContainerStateLocked,
597                                 RuntimeConstraints: arvados.RuntimeConstraints{
598                                         VCPUs: 2,
599                                         RAM:   2 << 30,
600                                 },
601                         },
602                         {
603                                 // tentatively map to unalloc worker
604                                 UUID:     test.ContainerUUID(5),
605                                 Priority: 5,
606                                 State:    arvados.ContainerStateLocked,
607                                 RuntimeConstraints: arvados.RuntimeConstraints{
608                                         VCPUs: 2,
609                                         RAM:   2 << 30,
610                                 },
611                         },
612                         {
613                                 // start now on idle worker
614                                 UUID:     test.ContainerUUID(6),
615                                 Priority: 6,
616                                 State:    arvados.ContainerStateLocked,
617                                 RuntimeConstraints: arvados.RuntimeConstraints{
618                                         VCPUs: 2,
619                                         RAM:   2 << 30,
620                                 },
621                         },
622                 },
623         }
624         queue.Update()
625         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0).runQueue()
626         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
627         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
628         running := map[string]bool{}
629         for uuid, t := range pool.running {
630                 if t.IsZero() {
631                         running[uuid] = false
632                 } else {
633                         running[uuid] = true
634                 }
635         }
636         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
637 }
638
639 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
640         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
641         pool := stubPool{
642                 quota: 1000,
643                 unalloc: map[arvados.InstanceType]int{
644                         test.InstanceType(2): 0,
645                 },
646                 idle: map[arvados.InstanceType]int{
647                         test.InstanceType(2): 0,
648                 },
649                 running: map[string]time.Time{
650                         test.ContainerUUID(2): {},
651                 },
652         }
653         queue := test.Queue{
654                 ChooseType: chooseType,
655                 Containers: []arvados.Container{
656                         {
657                                 // create a new worker
658                                 UUID:     test.ContainerUUID(1),
659                                 Priority: 1,
660                                 State:    arvados.ContainerStateLocked,
661                                 RuntimeConstraints: arvados.RuntimeConstraints{
662                                         VCPUs: 1,
663                                         RAM:   1 << 30,
664                                 },
665                         },
666                 },
667         }
668         queue.Update()
669         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
670         c.Check(pool.running, check.HasLen, 1)
671         sch.sync()
672         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
673         }
674         c.Check(pool.Running(), check.HasLen, 0)
675 }
676
677 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
678         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
679         queue := test.Queue{
680                 ChooseType: chooseType,
681                 Containers: []arvados.Container{
682                         {
683                                 UUID:      test.ContainerUUID(1),
684                                 Priority:  1,
685                                 State:     arvados.ContainerStateLocked,
686                                 CreatedAt: time.Now().Add(-10 * time.Second),
687                                 RuntimeConstraints: arvados.RuntimeConstraints{
688                                         VCPUs: 1,
689                                         RAM:   1 << 30,
690                                 },
691                         },
692                 },
693         }
694         queue.Update()
695
696         // Create a pool with one unallocated (idle/booting/unknown) worker,
697         // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
698         // state, and the container will be allocated but not started yet.
699         pool := stubPool{
700                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
701         }
702         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
703         sch.runQueue()
704         sch.updateMetrics()
705
706         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
707         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
708         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
709
710         // Create a pool without workers. The queued container will not be started, and the
711         // 'over quota' metric will be 1 because no workers are available and canCreate defaults
712         // to zero.
713         pool = stubPool{}
714         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
715         sch.runQueue()
716         sch.updateMetrics()
717
718         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
719         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
720         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
721
722         // Reset the queue, and create a pool with an idle worker. The queued
723         // container will be started immediately and mLongestWaitTimeSinceQueue
724         // should be zero.
725         queue = test.Queue{
726                 ChooseType: chooseType,
727                 Containers: []arvados.Container{
728                         {
729                                 UUID:      test.ContainerUUID(1),
730                                 Priority:  1,
731                                 State:     arvados.ContainerStateLocked,
732                                 CreatedAt: time.Now().Add(-10 * time.Second),
733                                 RuntimeConstraints: arvados.RuntimeConstraints{
734                                         VCPUs: 1,
735                                         RAM:   1 << 30,
736                                 },
737                         },
738                 },
739         }
740         queue.Update()
741
742         pool = stubPool{
743                 idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
744                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
745                 running: map[string]time.Time{},
746         }
747         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
748         sch.runQueue()
749         sch.updateMetrics()
750
751         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
752 }
753
754 // Assign priority=4, 3 and 1 containers to idle nodes. Ignore the supervisor at priority 2.
755 func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
756         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
757         queue := test.Queue{
758                 ChooseType: chooseType,
759                 Containers: []arvados.Container{
760                         {
761                                 UUID:     test.ContainerUUID(1),
762                                 Priority: 1,
763                                 State:    arvados.ContainerStateLocked,
764                                 RuntimeConstraints: arvados.RuntimeConstraints{
765                                         VCPUs: 1,
766                                         RAM:   1 << 30,
767                                 },
768                         },
769                         {
770                                 UUID:     test.ContainerUUID(2),
771                                 Priority: 2,
772                                 State:    arvados.ContainerStateLocked,
773                                 RuntimeConstraints: arvados.RuntimeConstraints{
774                                         VCPUs: 1,
775                                         RAM:   1 << 30,
776                                 },
777                                 SchedulingParameters: arvados.SchedulingParameters{
778                                         Supervisor: true,
779                                 },
780                         },
781                         {
782                                 UUID:     test.ContainerUUID(3),
783                                 Priority: 3,
784                                 State:    arvados.ContainerStateLocked,
785                                 RuntimeConstraints: arvados.RuntimeConstraints{
786                                         VCPUs: 1,
787                                         RAM:   1 << 30,
788                                 },
789                                 SchedulingParameters: arvados.SchedulingParameters{
790                                         Supervisor: true,
791                                 },
792                         },
793                         {
794                                 UUID:     test.ContainerUUID(4),
795                                 Priority: 4,
796                                 State:    arvados.ContainerStateLocked,
797                                 RuntimeConstraints: arvados.RuntimeConstraints{
798                                         VCPUs: 1,
799                                         RAM:   1 << 30,
800                                 },
801                                 SchedulingParameters: arvados.SchedulingParameters{
802                                         Supervisor: true,
803                                 },
804                         },
805                 },
806         }
807         queue.Update()
808         pool := stubPool{
809                 quota: 1000,
810                 unalloc: map[arvados.InstanceType]int{
811                         test.InstanceType(1): 4,
812                         test.InstanceType(2): 4,
813                 },
814                 idle: map[arvados.InstanceType]int{
815                         test.InstanceType(1): 4,
816                         test.InstanceType(2): 4,
817                 },
818                 running:   map[string]time.Time{},
819                 canCreate: 0,
820         }
821         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 10, 0.2).runQueue()
822         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
823         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})
824 }