1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
13 "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
14 "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/Sirupsen/logrus"
17 check "gopkg.in/check.v1"
21 logger = logrus.StandardLogger()
23 // arbitrary example instance types
24 types = func() (r []arvados.InstanceType) {
25 for i := 0; i < 16; i++ {
26 r = append(r, test.InstanceType(i))
31 // arbitrary example container UUIDs
32 uuids = func() (r []string) {
33 for i := 0; i < 16; i++ {
34 r = append(r, test.ContainerUUID(i))
40 type stubQueue struct {
41 ents map[string]container.QueueEnt
44 func (q *stubQueue) Entries() (map[string]container.QueueEnt, time.Time) {
45 return q.ents, time.Now()
47 func (q *stubQueue) Lock(uuid string) error {
48 return q.setState(uuid, arvados.ContainerStateLocked)
50 func (q *stubQueue) Unlock(uuid string) error {
51 return q.setState(uuid, arvados.ContainerStateQueued)
53 func (q *stubQueue) Cancel(uuid string) error {
54 return q.setState(uuid, arvados.ContainerStateCancelled)
56 func (q *stubQueue) Forget(uuid string) {
58 func (q *stubQueue) Get(uuid string) (arvados.Container, bool) {
59 ent, ok := q.ents[uuid]
60 return ent.Container, ok
62 func (q *stubQueue) setState(uuid string, state arvados.ContainerState) error {
63 ent, ok := q.ents[uuid]
65 return fmt.Errorf("no such ent: %q", uuid)
67 ent.Container.State = state
72 type stubQuotaError struct {
76 func (stubQuotaError) IsQuotaError() bool { return true }
78 type stubPool struct {
79 notify <-chan struct{}
80 unalloc map[arvados.InstanceType]int // idle+booting+unknown
81 idle map[arvados.InstanceType]int
82 running map[string]time.Time
85 creates []arvados.InstanceType
90 func (p *stubPool) AtQuota() bool { return p.atQuota }
91 func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
92 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
93 func (p *stubPool) Running() map[string]time.Time { return p.running }
94 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
95 r := map[arvados.InstanceType]int{}
96 for it, n := range p.unalloc {
101 func (p *stubPool) Create(it arvados.InstanceType) error {
102 p.creates = append(p.creates, it)
104 return stubQuotaError{errors.New("quota")}
110 func (p *stubPool) KillContainer(uuid string) {
111 p.running[uuid] = time.Now()
113 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
117 func (p *stubPool) Workers() map[worker.State]int {
118 return map[worker.State]int{
119 worker.StateBooting: len(p.unalloc) - len(p.idle),
120 worker.StateRunning: len(p.idle) - len(p.running),
123 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
124 p.starts = append(p.starts, ctr.UUID)
130 p.running[ctr.UUID] = time.Time{}
134 var _ = check.Suite(&SchedulerSuite{})
136 type SchedulerSuite struct{}
138 // Map priority=4 container to idle node. Create a new instance for
139 // the priority=3 container. Don't try to start any priority<3
140 // containers because priority=3 container didn't start
141 // immediately. Don't try to create any other nodes after the failed
143 func (*SchedulerSuite) TestMapIdle(c *check.C) {
145 ents: map[string]container.QueueEnt{
147 Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
148 InstanceType: types[1],
151 Container: arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued},
152 InstanceType: types[1],
155 Container: arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued},
156 InstanceType: types[1],
159 Container: arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued},
160 InstanceType: types[1],
165 unalloc: map[arvados.InstanceType]int{
169 idle: map[arvados.InstanceType]int{
173 running: map[string]time.Time{},
176 Map(logger, &queue, &pool)
177 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[1]})
178 c.Check(pool.starts, check.DeepEquals, []string{uuids[4], uuids[3]})
179 c.Check(pool.running, check.HasLen, 1)
180 for uuid := range pool.running {
181 c.Check(uuid, check.Equals, uuids[4])
185 // Shutdown some nodes if Create() fails -- and without even calling
186 // Create(), if AtQuota() is true.
187 func (*SchedulerSuite) TestMapShutdownAtQuota(c *check.C) {
188 for quota := 0; quota < 2; quota++ {
189 shouldCreate := types[1 : 1+quota]
191 ents: map[string]container.QueueEnt{
193 Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
194 InstanceType: types[1],
200 unalloc: map[arvados.InstanceType]int{
203 idle: map[arvados.InstanceType]int{
206 running: map[string]time.Time{},
207 creates: []arvados.InstanceType{},
211 Map(logger, &queue, &pool)
212 c.Check(pool.creates, check.DeepEquals, shouldCreate)
213 c.Check(pool.starts, check.DeepEquals, []string{})
214 c.Check(pool.shutdowns, check.Not(check.Equals), 0)
218 // Start lower-priority containers while waiting for new/existing
219 // workers to come up for higher-priority containers.
220 func (*SchedulerSuite) TestMapStartWhileCreating(c *check.C) {
222 unalloc: map[arvados.InstanceType]int{
226 idle: map[arvados.InstanceType]int{
230 running: map[string]time.Time{},
234 ents: map[string]container.QueueEnt{
236 // create a new worker
237 Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
238 InstanceType: types[1],
241 // tentatively map to unalloc worker
242 Container: arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued},
243 InstanceType: types[1],
246 // start now on idle worker
247 Container: arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued},
248 InstanceType: types[1],
251 // create a new worker
252 Container: arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued},
253 InstanceType: types[2],
256 // tentatively map to unalloc worker
257 Container: arvados.Container{UUID: uuids[5], Priority: 5, State: arvados.ContainerStateQueued},
258 InstanceType: types[2],
261 // start now on idle worker
262 Container: arvados.Container{UUID: uuids[6], Priority: 6, State: arvados.ContainerStateQueued},
263 InstanceType: types[2],
267 Map(logger, &queue, &pool)
268 c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[2], types[1]})
269 c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
270 running := map[string]bool{}
271 for uuid, t := range pool.running {
273 running[uuid] = false
278 c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})