Arvados-DCO-1.1-Signed-off-by: Dung Lam <dunglam@projectnelly.com>
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "context"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/test"
13         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16
17         "github.com/prometheus/client_golang/prometheus/testutil"
18
19         check "gopkg.in/check.v1"
20 )
21
22 var (
23         // arbitrary example container UUIDs
24         uuids = func() (r []string) {
25                 for i := 0; i < 16; i++ {
26                         r = append(r, test.ContainerUUID(i))
27                 }
28                 return
29         }()
30 )
31
32 type stubPool struct {
33         notify    <-chan struct{}
34         unalloc   map[arvados.InstanceType]int // idle+booting+unknown
35         busy      map[arvados.InstanceType]int
36         idle      map[arvados.InstanceType]int
37         unknown   map[arvados.InstanceType]int
38         running   map[string]time.Time
39         quota     int
40         capacity  map[string]int
41         canCreate int
42         creates   []arvados.InstanceType
43         starts    []string
44         shutdowns int
45         sync.Mutex
46 }
47
48 func (p *stubPool) AtQuota() bool {
49         p.Lock()
50         defer p.Unlock()
51         n := len(p.running)
52         for _, nn := range p.unalloc {
53                 n += nn
54         }
55         for _, nn := range p.unknown {
56                 n += nn
57         }
58         return n >= p.quota
59 }
60 func (p *stubPool) AtCapacity(it arvados.InstanceType) bool {
61         supply, ok := p.capacity[it.ProviderType]
62         if !ok {
63                 return false
64         }
65         for _, existing := range []map[arvados.InstanceType]int{p.unalloc, p.busy} {
66                 for eit, n := range existing {
67                         if eit.ProviderType == it.ProviderType {
68                                 supply -= n
69                         }
70                 }
71         }
72         return supply < 1
73 }
74 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
75 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
76 func (p *stubPool) Running() map[string]time.Time {
77         p.Lock()
78         defer p.Unlock()
79         r := map[string]time.Time{}
80         for k, v := range p.running {
81                 r[k] = v
82         }
83         return r
84 }
85 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
86         p.Lock()
87         defer p.Unlock()
88         r := map[arvados.InstanceType]int{}
89         for it, n := range p.unalloc {
90                 r[it] = n - p.unknown[it]
91         }
92         return r
93 }
94 func (p *stubPool) Create(it arvados.InstanceType) bool {
95         p.Lock()
96         defer p.Unlock()
97         p.creates = append(p.creates, it)
98         if p.canCreate < 1 {
99                 return false
100         }
101         p.canCreate--
102         p.unalloc[it]++
103         return true
104 }
105 func (p *stubPool) ForgetContainer(uuid string) {
106 }
107 func (p *stubPool) KillContainer(uuid, reason string) bool {
108         p.Lock()
109         defer p.Unlock()
110         defer delete(p.running, uuid)
111         t, ok := p.running[uuid]
112         return ok && t.IsZero()
113 }
114 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
115         p.shutdowns++
116         return false
117 }
118 func (p *stubPool) CountWorkers() map[worker.State]int {
119         p.Lock()
120         defer p.Unlock()
121         return map[worker.State]int{
122                 worker.StateBooting: len(p.unalloc) - len(p.idle),
123                 worker.StateIdle:    len(p.idle),
124                 worker.StateRunning: len(p.running),
125                 worker.StateUnknown: len(p.unknown),
126         }
127 }
128 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
129         p.Lock()
130         defer p.Unlock()
131         p.starts = append(p.starts, ctr.UUID)
132         if p.idle[it] == 0 {
133                 return false
134         }
135         p.busy[it]++
136         p.idle[it]--
137         p.unalloc[it]--
138         p.running[ctr.UUID] = time.Time{}
139         return true
140 }
141
142 func chooseType(ctr *arvados.Container) ([]arvados.InstanceType, error) {
143         return []arvados.InstanceType{test.InstanceType(ctr.RuntimeConstraints.VCPUs)}, nil
144 }
145
146 var _ = check.Suite(&SchedulerSuite{})
147
148 type SchedulerSuite struct{}
149
150 // Assign priority=4 container to idle node. Create new instances for
151 // the priority=3, 2, 1 containers.
152 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
153         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
154         queue := test.Queue{
155                 ChooseType: chooseType,
156                 Containers: []arvados.Container{
157                         {
158                                 UUID:     test.ContainerUUID(1),
159                                 Priority: 1,
160                                 State:    arvados.ContainerStateLocked,
161                                 RuntimeConstraints: arvados.RuntimeConstraints{
162                                         VCPUs: 1,
163                                         RAM:   1 << 30,
164                                 },
165                         },
166                         {
167                                 UUID:     test.ContainerUUID(2),
168                                 Priority: 2,
169                                 State:    arvados.ContainerStateLocked,
170                                 RuntimeConstraints: arvados.RuntimeConstraints{
171                                         VCPUs: 1,
172                                         RAM:   1 << 30,
173                                 },
174                         },
175                         {
176                                 UUID:     test.ContainerUUID(3),
177                                 Priority: 3,
178                                 State:    arvados.ContainerStateLocked,
179                                 RuntimeConstraints: arvados.RuntimeConstraints{
180                                         VCPUs: 1,
181                                         RAM:   1 << 30,
182                                 },
183                         },
184                         {
185                                 UUID:     test.ContainerUUID(4),
186                                 Priority: 4,
187                                 State:    arvados.ContainerStateLocked,
188                                 RuntimeConstraints: arvados.RuntimeConstraints{
189                                         VCPUs: 1,
190                                         RAM:   1 << 30,
191                                 },
192                         },
193                 },
194         }
195         queue.Update()
196         pool := stubPool{
197                 quota: 1000,
198                 unalloc: map[arvados.InstanceType]int{
199                         test.InstanceType(1): 1,
200                         test.InstanceType(2): 2,
201                 },
202                 idle: map[arvados.InstanceType]int{
203                         test.InstanceType(1): 1,
204                         test.InstanceType(2): 2,
205                 },
206                 busy:      map[arvados.InstanceType]int{},
207                 running:   map[string]time.Time{},
208                 canCreate: 0,
209         }
210         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0).runQueue()
211         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
212         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
213         c.Check(pool.running, check.HasLen, 1)
214         for uuid := range pool.running {
215                 c.Check(uuid, check.Equals, uuids[4])
216         }
217 }
218
219 // If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
220 // call Create().
221 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
222         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
223         for quota := 1; quota <= 3; quota++ {
224                 c.Logf("quota=%d", quota)
225                 queue := test.Queue{
226                         ChooseType: chooseType,
227                         Containers: []arvados.Container{
228                                 {
229                                         UUID:     test.ContainerUUID(2),
230                                         Priority: 2,
231                                         State:    arvados.ContainerStateLocked,
232                                         RuntimeConstraints: arvados.RuntimeConstraints{
233                                                 VCPUs: 2,
234                                                 RAM:   2 << 30,
235                                         },
236                                 },
237                                 {
238                                         UUID:     test.ContainerUUID(3),
239                                         Priority: 3,
240                                         State:    arvados.ContainerStateLocked,
241                                         RuntimeConstraints: arvados.RuntimeConstraints{
242                                                 VCPUs: 3,
243                                                 RAM:   3 << 30,
244                                         },
245                                 },
246                         },
247                 }
248                 queue.Update()
249                 pool := stubPool{
250                         quota: quota,
251                         unalloc: map[arvados.InstanceType]int{
252                                 test.InstanceType(2): 2,
253                         },
254                         idle: map[arvados.InstanceType]int{
255                                 test.InstanceType(2): 2,
256                         },
257                         busy:      map[arvados.InstanceType]int{},
258                         running:   map[string]time.Time{},
259                         creates:   []arvados.InstanceType{},
260                         starts:    []string{},
261                         canCreate: 0,
262                 }
263                 sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
264                 sch.sync()
265                 sch.runQueue()
266                 sch.sync()
267                 switch quota {
268                 case 1, 2:
269                         // Can't create a type3 node for ctr3, so we
270                         // shutdown an unallocated node (type2), and
271                         // unlock the 2nd-in-line container, but not
272                         // the 1st-in-line container.
273                         c.Check(pool.starts, check.HasLen, 0)
274                         c.Check(pool.shutdowns, check.Equals, 1)
275                         c.Check(pool.creates, check.HasLen, 0)
276                         c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
277                                 {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
278                         })
279                 case 3:
280                         // Creating a type3 instance works, so we
281                         // start ctr2 on a type2 instance, and leave
282                         // ctr3 locked while we wait for the new
283                         // instance to come up.
284                         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
285                         c.Check(pool.shutdowns, check.Equals, 0)
286                         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
287                         c.Check(queue.StateChanges(), check.HasLen, 0)
288                 default:
289                         panic("test not written for quota>3")
290                 }
291         }
292 }
293
294 // If pool.AtCapacity(it) is true for one instance type, try running a
295 // lower-priority container that uses a different node type.  Don't
296 // lock/unlock/start any container that requires the affected instance
297 // type.
298 func (*SchedulerSuite) TestInstanceCapacity(c *check.C) {
299         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
300
301         queue := test.Queue{
302                 ChooseType: chooseType,
303                 Containers: []arvados.Container{
304                         {
305                                 UUID:     test.ContainerUUID(1),
306                                 Priority: 1,
307                                 State:    arvados.ContainerStateLocked,
308                                 RuntimeConstraints: arvados.RuntimeConstraints{
309                                         VCPUs: 1,
310                                         RAM:   1 << 30,
311                                 },
312                         },
313                         {
314                                 UUID:     test.ContainerUUID(2),
315                                 Priority: 2,
316                                 State:    arvados.ContainerStateQueued,
317                                 RuntimeConstraints: arvados.RuntimeConstraints{
318                                         VCPUs: 4,
319                                         RAM:   4 << 30,
320                                 },
321                         },
322                         {
323                                 UUID:     test.ContainerUUID(3),
324                                 Priority: 3,
325                                 State:    arvados.ContainerStateLocked,
326                                 RuntimeConstraints: arvados.RuntimeConstraints{
327                                         VCPUs: 4,
328                                         RAM:   4 << 30,
329                                 },
330                         },
331                         {
332                                 UUID:     test.ContainerUUID(4),
333                                 Priority: 4,
334                                 State:    arvados.ContainerStateLocked,
335                                 RuntimeConstraints: arvados.RuntimeConstraints{
336                                         VCPUs: 4,
337                                         RAM:   4 << 30,
338                                 },
339                         },
340                 },
341         }
342         queue.Update()
343         pool := stubPool{
344                 quota:    99,
345                 capacity: map[string]int{test.InstanceType(4).ProviderType: 1},
346                 unalloc: map[arvados.InstanceType]int{
347                         test.InstanceType(4): 1,
348                 },
349                 idle: map[arvados.InstanceType]int{
350                         test.InstanceType(4): 1,
351                 },
352                 busy:      map[arvados.InstanceType]int{},
353                 running:   map[string]time.Time{},
354                 creates:   []arvados.InstanceType{},
355                 starts:    []string{},
356                 canCreate: 99,
357         }
358         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
359         sch.sync()
360         sch.runQueue()
361         sch.sync()
362
363         // Start container4, but then pool reports AtCapacity for
364         // type4, so we skip trying to create an instance for
365         // container3, skip locking container2, but do try to create a
366         // type1 instance for container1.
367         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
368         c.Check(pool.shutdowns, check.Equals, 0)
369         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
370         c.Check(queue.StateChanges(), check.HasLen, 0)
371 }
372
373 // Don't unlock containers or shutdown unalloc (booting/idle) nodes
374 // just because some 503 errors caused us to reduce maxConcurrency
375 // below the current load level.
376 //
377 // We expect to raise maxConcurrency soon when we stop seeing 503s. If
378 // that doesn't happen soon, the idle timeout will take care of the
379 // excess nodes.
380 func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
381         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
382         queue := test.Queue{
383                 ChooseType: chooseType,
384                 Containers: []arvados.Container{
385                         // scheduled on an instance (but not Running yet)
386                         {
387                                 UUID:     test.ContainerUUID(1),
388                                 Priority: 1000,
389                                 State:    arvados.ContainerStateLocked,
390                                 RuntimeConstraints: arvados.RuntimeConstraints{
391                                         VCPUs: 2,
392                                         RAM:   2 << 30,
393                                 },
394                         },
395                         // not yet scheduled
396                         {
397                                 UUID:     test.ContainerUUID(2),
398                                 Priority: 1000,
399                                 State:    arvados.ContainerStateLocked,
400                                 RuntimeConstraints: arvados.RuntimeConstraints{
401                                         VCPUs: 2,
402                                         RAM:   2 << 30,
403                                 },
404                         },
405                         // scheduled on an instance (but not Running yet)
406                         {
407                                 UUID:     test.ContainerUUID(3),
408                                 Priority: 1000,
409                                 State:    arvados.ContainerStateLocked,
410                                 RuntimeConstraints: arvados.RuntimeConstraints{
411                                         VCPUs: 3,
412                                         RAM:   3 << 30,
413                                 },
414                         },
415                         // not yet scheduled
416                         {
417                                 UUID:     test.ContainerUUID(4),
418                                 Priority: 1000,
419                                 State:    arvados.ContainerStateLocked,
420                                 RuntimeConstraints: arvados.RuntimeConstraints{
421                                         VCPUs: 3,
422                                         RAM:   3 << 30,
423                                 },
424                         },
425                         // not yet locked
426                         {
427                                 UUID:     test.ContainerUUID(5),
428                                 Priority: 1000,
429                                 State:    arvados.ContainerStateQueued,
430                                 RuntimeConstraints: arvados.RuntimeConstraints{
431                                         VCPUs: 3,
432                                         RAM:   3 << 30,
433                                 },
434                         },
435                 },
436         }
437         queue.Update()
438         pool := stubPool{
439                 quota: 16,
440                 unalloc: map[arvados.InstanceType]int{
441                         test.InstanceType(2): 2,
442                         test.InstanceType(3): 2,
443                 },
444                 idle: map[arvados.InstanceType]int{
445                         test.InstanceType(2): 1,
446                         test.InstanceType(3): 1,
447                 },
448                 busy: map[arvados.InstanceType]int{
449                         test.InstanceType(2): 1,
450                         test.InstanceType(3): 1,
451                 },
452                 running: map[string]time.Time{
453                         test.ContainerUUID(1): {},
454                         test.ContainerUUID(3): {},
455                 },
456                 creates:   []arvados.InstanceType{},
457                 starts:    []string{},
458                 canCreate: 0,
459         }
460         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
461         sch.last503time = time.Now()
462         sch.maxConcurrency = 3
463         sch.sync()
464         sch.runQueue()
465         sch.sync()
466
467         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
468         c.Check(pool.shutdowns, check.Equals, 0)
469         c.Check(pool.creates, check.HasLen, 0)
470         c.Check(queue.StateChanges(), check.HasLen, 0)
471 }
472
473 // If we somehow have more supervisor containers in Locked state than
474 // we should (e.g., config changed since they started), and some
475 // appropriate-sized instances booting up, unlock the excess
476 // supervisor containers, but let the instances keep booting.
477 func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
478         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
479         queue := test.Queue{
480                 ChooseType: chooseType,
481         }
482         for i := 1; i <= 6; i++ {
483                 queue.Containers = append(queue.Containers, arvados.Container{
484                         UUID:     test.ContainerUUID(i),
485                         Priority: int64(1000 - i),
486                         State:    arvados.ContainerStateLocked,
487                         RuntimeConstraints: arvados.RuntimeConstraints{
488                                 VCPUs: 2,
489                                 RAM:   2 << 30,
490                         },
491                         SchedulingParameters: arvados.SchedulingParameters{
492                                 Supervisor: true,
493                         },
494                 })
495         }
496         queue.Update()
497         pool := stubPool{
498                 quota: 16,
499                 unalloc: map[arvados.InstanceType]int{
500                         test.InstanceType(2): 2,
501                 },
502                 idle: map[arvados.InstanceType]int{
503                         test.InstanceType(2): 1,
504                 },
505                 busy: map[arvados.InstanceType]int{
506                         test.InstanceType(2): 4,
507                 },
508                 running: map[string]time.Time{
509                         test.ContainerUUID(1): {},
510                         test.ContainerUUID(2): {},
511                         test.ContainerUUID(3): {},
512                         test.ContainerUUID(4): {},
513                 },
514                 creates:   []arvados.InstanceType{},
515                 starts:    []string{},
516                 canCreate: 0,
517         }
518         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 8, 0.5)
519         sch.sync()
520         sch.runQueue()
521         sch.sync()
522
523         c.Check(pool.starts, check.DeepEquals, []string{})
524         c.Check(pool.shutdowns, check.Equals, 0)
525         c.Check(pool.creates, check.HasLen, 0)
526         c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
527                 {UUID: test.ContainerUUID(5), From: "Locked", To: "Queued"},
528                 {UUID: test.ContainerUUID(6), From: "Locked", To: "Queued"},
529         })
530 }
531
532 // Assuming we're not at quota, don't try to shutdown idle nodes
533 // merely because we have more queued/locked supervisor containers
534 // than MaxSupervisors -- it won't help.
535 func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
536         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
537         queue := test.Queue{
538                 ChooseType: chooseType,
539         }
540         for i := 1; i <= 8; i++ {
541                 queue.Containers = append(queue.Containers, arvados.Container{
542                         UUID:     test.ContainerUUID(i),
543                         Priority: int64(1000 + i),
544                         State:    arvados.ContainerStateQueued,
545                         RuntimeConstraints: arvados.RuntimeConstraints{
546                                 VCPUs: 2,
547                                 RAM:   2 << 30,
548                         },
549                         SchedulingParameters: arvados.SchedulingParameters{
550                                 Supervisor: true,
551                         },
552                 })
553         }
554         for i := 2; i < 4; i++ {
555                 queue.Containers[i].State = arvados.ContainerStateLocked
556         }
557         for i := 4; i < 6; i++ {
558                 queue.Containers[i].State = arvados.ContainerStateRunning
559         }
560         queue.Update()
561         pool := stubPool{
562                 quota: 16,
563                 unalloc: map[arvados.InstanceType]int{
564                         test.InstanceType(2): 2,
565                 },
566                 idle: map[arvados.InstanceType]int{
567                         test.InstanceType(2): 1,
568                 },
569                 busy: map[arvados.InstanceType]int{
570                         test.InstanceType(2): 2,
571                 },
572                 running: map[string]time.Time{
573                         test.ContainerUUID(5): {},
574                         test.ContainerUUID(6): {},
575                 },
576                 creates:   []arvados.InstanceType{},
577                 starts:    []string{},
578                 canCreate: 0,
579         }
580         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 8, 0.5)
581         sch.sync()
582         sch.runQueue()
583         sch.sync()
584
585         c.Check(pool.starts, check.HasLen, 2)
586         c.Check(pool.shutdowns, check.Equals, 0)
587         c.Check(pool.creates, check.HasLen, 0)
588         c.Check(queue.StateChanges(), check.HasLen, 0)
589 }
590
591 // Don't flap lock/unlock when equal-priority containers compete for
592 // limited workers.
593 //
594 // (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
595 // tends to choose a different one of the equal-priority containers as
596 // the "first" one that should be locked, and unlock the one it chose
597 // last time. This generates logging noise, and fails containers by
598 // reaching MaxDispatchAttempts quickly.)
599 func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
600         logger := ctxlog.TestLogger(c)
601         ctx := ctxlog.Context(context.Background(), logger)
602         queue := test.Queue{
603                 ChooseType: chooseType,
604                 Logger:     logger,
605         }
606         for i := 0; i < 8; i++ {
607                 queue.Containers = append(queue.Containers, arvados.Container{
608                         UUID:     test.ContainerUUID(i),
609                         Priority: 333,
610                         State:    arvados.ContainerStateQueued,
611                         RuntimeConstraints: arvados.RuntimeConstraints{
612                                 VCPUs: 3,
613                                 RAM:   3 << 30,
614                         },
615                 })
616         }
617         queue.Update()
618         pool := stubPool{
619                 quota: 2,
620                 unalloc: map[arvados.InstanceType]int{
621                         test.InstanceType(3): 2,
622                 },
623                 idle: map[arvados.InstanceType]int{
624                         test.InstanceType(3): 2,
625                 },
626                 busy:      map[arvados.InstanceType]int{},
627                 running:   map[string]time.Time{},
628                 creates:   []arvados.InstanceType{},
629                 starts:    []string{},
630                 canCreate: 0,
631         }
632         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
633         for i := 0; i < 30; i++ {
634                 sch.runQueue()
635                 sch.sync()
636                 time.Sleep(time.Millisecond)
637         }
638         c.Check(pool.shutdowns, check.Equals, 0)
639         c.Check(pool.starts, check.HasLen, 2)
640         unlocked := map[string]int{}
641         for _, chg := range queue.StateChanges() {
642                 if chg.To == arvados.ContainerStateQueued {
643                         unlocked[chg.UUID]++
644                 }
645         }
646         for uuid, count := range unlocked {
647                 c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
648         }
649 }
650
651 // Start lower-priority containers while waiting for new/existing
652 // workers to come up for higher-priority containers.
653 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
654         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
655         pool := stubPool{
656                 quota: 1000,
657                 unalloc: map[arvados.InstanceType]int{
658                         test.InstanceType(1): 2,
659                         test.InstanceType(2): 2,
660                 },
661                 idle: map[arvados.InstanceType]int{
662                         test.InstanceType(1): 1,
663                         test.InstanceType(2): 1,
664                 },
665                 busy:      map[arvados.InstanceType]int{},
666                 running:   map[string]time.Time{},
667                 canCreate: 4,
668         }
669         queue := test.Queue{
670                 ChooseType: chooseType,
671                 Containers: []arvados.Container{
672                         {
673                                 // create a new worker
674                                 UUID:     test.ContainerUUID(1),
675                                 Priority: 1,
676                                 State:    arvados.ContainerStateLocked,
677                                 RuntimeConstraints: arvados.RuntimeConstraints{
678                                         VCPUs: 1,
679                                         RAM:   1 << 30,
680                                 },
681                         },
682                         {
683                                 // tentatively map to unalloc worker
684                                 UUID:     test.ContainerUUID(2),
685                                 Priority: 2,
686                                 State:    arvados.ContainerStateLocked,
687                                 RuntimeConstraints: arvados.RuntimeConstraints{
688                                         VCPUs: 1,
689                                         RAM:   1 << 30,
690                                 },
691                         },
692                         {
693                                 // start now on idle worker
694                                 UUID:     test.ContainerUUID(3),
695                                 Priority: 3,
696                                 State:    arvados.ContainerStateLocked,
697                                 RuntimeConstraints: arvados.RuntimeConstraints{
698                                         VCPUs: 1,
699                                         RAM:   1 << 30,
700                                 },
701                         },
702                         {
703                                 // create a new worker
704                                 UUID:     test.ContainerUUID(4),
705                                 Priority: 4,
706                                 State:    arvados.ContainerStateLocked,
707                                 RuntimeConstraints: arvados.RuntimeConstraints{
708                                         VCPUs: 2,
709                                         RAM:   2 << 30,
710                                 },
711                         },
712                         {
713                                 // tentatively map to unalloc worker
714                                 UUID:     test.ContainerUUID(5),
715                                 Priority: 5,
716                                 State:    arvados.ContainerStateLocked,
717                                 RuntimeConstraints: arvados.RuntimeConstraints{
718                                         VCPUs: 2,
719                                         RAM:   2 << 30,
720                                 },
721                         },
722                         {
723                                 // start now on idle worker
724                                 UUID:     test.ContainerUUID(6),
725                                 Priority: 6,
726                                 State:    arvados.ContainerStateLocked,
727                                 RuntimeConstraints: arvados.RuntimeConstraints{
728                                         VCPUs: 2,
729                                         RAM:   2 << 30,
730                                 },
731                         },
732                 },
733         }
734         queue.Update()
735         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0).runQueue()
736         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
737         c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
738         running := map[string]bool{}
739         for uuid, t := range pool.running {
740                 if t.IsZero() {
741                         running[uuid] = false
742                 } else {
743                         running[uuid] = true
744                 }
745         }
746         c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
747 }
748
749 func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
750         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
751         pool := stubPool{
752                 quota: 1000,
753                 unalloc: map[arvados.InstanceType]int{
754                         test.InstanceType(2): 0,
755                 },
756                 idle: map[arvados.InstanceType]int{
757                         test.InstanceType(2): 0,
758                 },
759                 busy: map[arvados.InstanceType]int{
760                         test.InstanceType(2): 1,
761                 },
762                 running: map[string]time.Time{
763                         test.ContainerUUID(2): {},
764                 },
765         }
766         queue := test.Queue{
767                 ChooseType: chooseType,
768                 Containers: []arvados.Container{
769                         {
770                                 // create a new worker
771                                 UUID:     test.ContainerUUID(1),
772                                 Priority: 1,
773                                 State:    arvados.ContainerStateLocked,
774                                 RuntimeConstraints: arvados.RuntimeConstraints{
775                                         VCPUs: 1,
776                                         RAM:   1 << 30,
777                                 },
778                         },
779                 },
780         }
781         queue.Update()
782         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
783         c.Check(pool.running, check.HasLen, 1)
784         sch.sync()
785         for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
786         }
787         c.Check(pool.Running(), check.HasLen, 0)
788 }
789
790 func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
791         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
792         queue := test.Queue{
793                 ChooseType: chooseType,
794                 Containers: []arvados.Container{
795                         {
796                                 UUID:      test.ContainerUUID(1),
797                                 Priority:  1,
798                                 State:     arvados.ContainerStateLocked,
799                                 CreatedAt: time.Now().Add(-10 * time.Second),
800                                 RuntimeConstraints: arvados.RuntimeConstraints{
801                                         VCPUs: 1,
802                                         RAM:   1 << 30,
803                                 },
804                         },
805                 },
806         }
807         queue.Update()
808
809         // Create a pool with one unallocated (idle/booting/unknown) worker,
810         // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
811         // state, and the container will be allocated but not started yet.
812         pool := stubPool{
813                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
814         }
815         sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
816         sch.runQueue()
817         sch.updateMetrics()
818
819         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
820         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
821         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
822
823         // Create a pool without workers. The queued container will not be started, and the
824         // 'over quota' metric will be 1 because no workers are available and canCreate defaults
825         // to zero.
826         pool = stubPool{}
827         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
828         sch.runQueue()
829         sch.updateMetrics()
830
831         c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
832         c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
833         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 10)
834
835         // Reset the queue, and create a pool with an idle worker. The queued
836         // container will be started immediately and mLongestWaitTimeSinceQueue
837         // should be zero.
838         queue = test.Queue{
839                 ChooseType: chooseType,
840                 Containers: []arvados.Container{
841                         {
842                                 UUID:      test.ContainerUUID(1),
843                                 Priority:  1,
844                                 State:     arvados.ContainerStateLocked,
845                                 CreatedAt: time.Now().Add(-10 * time.Second),
846                                 RuntimeConstraints: arvados.RuntimeConstraints{
847                                         VCPUs: 1,
848                                         RAM:   1 << 30,
849                                 },
850                         },
851                 },
852         }
853         queue.Update()
854
855         pool = stubPool{
856                 idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
857                 unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
858                 busy:    map[arvados.InstanceType]int{},
859                 running: map[string]time.Time{},
860         }
861         sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
862         sch.runQueue()
863         sch.updateMetrics()
864
865         c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
866 }
867
868 // Assign priority=4, 3 and 1 containers to idle nodes. Ignore the supervisor at priority 2.
869 func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
870         ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
871         queue := test.Queue{
872                 ChooseType: chooseType,
873                 Containers: []arvados.Container{
874                         {
875                                 UUID:     test.ContainerUUID(1),
876                                 Priority: 1,
877                                 State:    arvados.ContainerStateLocked,
878                                 RuntimeConstraints: arvados.RuntimeConstraints{
879                                         VCPUs: 1,
880                                         RAM:   1 << 30,
881                                 },
882                         },
883                         {
884                                 UUID:     test.ContainerUUID(2),
885                                 Priority: 2,
886                                 State:    arvados.ContainerStateLocked,
887                                 RuntimeConstraints: arvados.RuntimeConstraints{
888                                         VCPUs: 1,
889                                         RAM:   1 << 30,
890                                 },
891                                 SchedulingParameters: arvados.SchedulingParameters{
892                                         Supervisor: true,
893                                 },
894                         },
895                         {
896                                 UUID:     test.ContainerUUID(3),
897                                 Priority: 3,
898                                 State:    arvados.ContainerStateLocked,
899                                 RuntimeConstraints: arvados.RuntimeConstraints{
900                                         VCPUs: 1,
901                                         RAM:   1 << 30,
902                                 },
903                                 SchedulingParameters: arvados.SchedulingParameters{
904                                         Supervisor: true,
905                                 },
906                         },
907                         {
908                                 UUID:     test.ContainerUUID(4),
909                                 Priority: 4,
910                                 State:    arvados.ContainerStateLocked,
911                                 RuntimeConstraints: arvados.RuntimeConstraints{
912                                         VCPUs: 1,
913                                         RAM:   1 << 30,
914                                 },
915                                 SchedulingParameters: arvados.SchedulingParameters{
916                                         Supervisor: true,
917                                 },
918                         },
919                 },
920         }
921         queue.Update()
922         pool := stubPool{
923                 quota: 1000,
924                 unalloc: map[arvados.InstanceType]int{
925                         test.InstanceType(1): 4,
926                         test.InstanceType(2): 4,
927                 },
928                 idle: map[arvados.InstanceType]int{
929                         test.InstanceType(1): 4,
930                         test.InstanceType(2): 4,
931                 },
932                 busy:      map[arvados.InstanceType]int{},
933                 running:   map[string]time.Time{},
934                 canCreate: 0,
935         }
936         New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 10, 0.2).runQueue()
937         c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
938         c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})
939 }