import (
"errors"
- "fmt"
"time"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var (
logger = logrus.StandardLogger()
- // arbitrary example instance types
- types = func() (r []arvados.InstanceType) {
- for i := 0; i < 16; i++ {
- r = append(r, test.InstanceType(i))
- }
- return
- }()
-
// arbitrary example container UUIDs
uuids = func() (r []string) {
for i := 0; i < 16; i++ {
}()
)
-type stubQueue struct {
- ents map[string]container.QueueEnt
-}
-
-func (q *stubQueue) Entries() (map[string]container.QueueEnt, time.Time) {
- return q.ents, time.Now()
-}
-func (q *stubQueue) Lock(uuid string) error {
- return q.setState(uuid, arvados.ContainerStateLocked)
-}
-func (q *stubQueue) Unlock(uuid string) error {
- return q.setState(uuid, arvados.ContainerStateQueued)
-}
-func (q *stubQueue) Cancel(uuid string) error {
- return q.setState(uuid, arvados.ContainerStateCancelled)
-}
-func (q *stubQueue) Forget(uuid string) {
-}
-func (q *stubQueue) Get(uuid string) (arvados.Container, bool) {
- ent, ok := q.ents[uuid]
- return ent.Container, ok
-}
-func (q *stubQueue) setState(uuid string, state arvados.ContainerState) error {
- ent, ok := q.ents[uuid]
- if !ok {
- return fmt.Errorf("no such ent: %q", uuid)
- }
- ent.Container.State = state
- q.ents[uuid] = ent
- return nil
-}
-
type stubQuotaError struct {
error
}
type SchedulerSuite struct{}
-// Map priority=4 container to idle node. Create a new instance for
+// Assign priority=4 container to idle node. Create a new instance for
// the priority=3 container. Don't try to start any priority<3
// containers because priority=3 container didn't start
// immediately. Don't try to create any other nodes after the failed
// create.
-func (*SchedulerSuite) TestMapIdle(c *check.C) {
- queue := stubQueue{
- ents: map[string]container.QueueEnt{
- uuids[1]: {
- Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+ queue := test.Queue{
+ ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+ },
+ Containers: []arvados.Container{
+ {
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
- uuids[2]: {
- Container: arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+ {
+ UUID: test.ContainerUUID(2),
+ Priority: 2,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
- uuids[3]: {
- Container: arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+ {
+ UUID: test.ContainerUUID(3),
+ Priority: 3,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
- uuids[4]: {
- Container: arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+ {
+ UUID: test.ContainerUUID(4),
+ Priority: 4,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
},
}
+ queue.Update()
pool := stubPool{
unalloc: map[arvados.InstanceType]int{
- types[1]: 1,
- types[2]: 2,
+ test.InstanceType(1): 1,
+ test.InstanceType(2): 2,
},
idle: map[arvados.InstanceType]int{
- types[1]: 1,
- types[2]: 2,
+ test.InstanceType(1): 1,
+ test.InstanceType(2): 2,
},
running: map[string]time.Time{},
canCreate: 1,
}
- Map(logger, &queue, &pool)
- c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[1]})
- c.Check(pool.starts, check.DeepEquals, []string{uuids[4], uuids[3]})
+ New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
+ c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3)})
c.Check(pool.running, check.HasLen, 1)
for uuid := range pool.running {
c.Check(uuid, check.Equals, uuids[4])
// Shutdown some nodes if Create() fails -- and without even calling
// Create(), if AtQuota() is true.
-func (*SchedulerSuite) TestMapShutdownAtQuota(c *check.C) {
+func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
for quota := 0; quota < 2; quota++ {
- shouldCreate := types[1 : 1+quota]
- queue := stubQueue{
- ents: map[string]container.QueueEnt{
- uuids[1]: {
- Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+ shouldCreate := []arvados.InstanceType{}
+ for i := 1; i < 1+quota; i++ {
+ shouldCreate = append(shouldCreate, test.InstanceType(i))
+ }
+ queue := test.Queue{
+ ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+ },
+ Containers: []arvados.Container{
+ {
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
},
}
+ queue.Update()
pool := stubPool{
atQuota: quota == 0,
unalloc: map[arvados.InstanceType]int{
- types[2]: 2,
+ test.InstanceType(2): 2,
},
idle: map[arvados.InstanceType]int{
- types[2]: 2,
+ test.InstanceType(2): 2,
},
running: map[string]time.Time{},
creates: []arvados.InstanceType{},
starts: []string{},
canCreate: 0,
}
- Map(logger, &queue, &pool)
+ New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
c.Check(pool.starts, check.DeepEquals, []string{})
c.Check(pool.shutdowns, check.Not(check.Equals), 0)
// Start lower-priority containers while waiting for new/existing
// workers to come up for higher-priority containers.
-func (*SchedulerSuite) TestMapStartWhileCreating(c *check.C) {
+func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
pool := stubPool{
unalloc: map[arvados.InstanceType]int{
- types[1]: 1,
- types[2]: 1,
+ test.InstanceType(1): 1,
+ test.InstanceType(2): 1,
},
idle: map[arvados.InstanceType]int{
- types[1]: 1,
- types[2]: 1,
+ test.InstanceType(1): 1,
+ test.InstanceType(2): 1,
},
running: map[string]time.Time{},
canCreate: 2,
}
- queue := stubQueue{
- ents: map[string]container.QueueEnt{
- uuids[1]: {
+ queue := test.Queue{
+ ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+ },
+ Containers: []arvados.Container{
+ {
// create a new worker
- Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
- uuids[2]: {
+ {
// tentatively map to unalloc worker
- Container: arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+ UUID: test.ContainerUUID(2),
+ Priority: 2,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
- uuids[3]: {
+ {
// start now on idle worker
- Container: arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued},
- InstanceType: types[1],
+ UUID: test.ContainerUUID(3),
+ Priority: 3,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
},
- uuids[4]: {
+ {
// create a new worker
- Container: arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued},
- InstanceType: types[2],
+ UUID: test.ContainerUUID(4),
+ Priority: 4,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 2 << 30,
+ },
},
- uuids[5]: {
+ {
// tentatively map to unalloc worker
- Container: arvados.Container{UUID: uuids[5], Priority: 5, State: arvados.ContainerStateQueued},
- InstanceType: types[2],
+ UUID: test.ContainerUUID(5),
+ Priority: 5,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 2 << 30,
+ },
},
- uuids[6]: {
+ {
// start now on idle worker
- Container: arvados.Container{UUID: uuids[6], Priority: 6, State: arvados.ContainerStateQueued},
- InstanceType: types[2],
+ UUID: test.ContainerUUID(6),
+ Priority: 6,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 2 << 30,
+ },
},
},
}
- Map(logger, &queue, &pool)
- c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[2], types[1]})
+ queue.Update()
+ New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
for uuid, t := range pool.running {