package scheduler
import (
- "errors"
+ "context"
+ "sync"
"time"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
var (
- logger = logrus.StandardLogger()
-
// arbitrary example container UUIDs
uuids = func() (r []string) {
for i := 0; i < 16; i++ {
notify <-chan struct{}
unalloc map[arvados.InstanceType]int // idle+booting+unknown
idle map[arvados.InstanceType]int
+ unknown map[arvados.InstanceType]int
running map[string]time.Time
atQuota bool
canCreate int
creates []arvados.InstanceType
starts []string
shutdowns int
+ sync.Mutex
}
-func (p *stubPool) AtQuota() bool { return p.atQuota }
-func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
-func (p *stubPool) Unsubscribe(<-chan struct{}) {}
-func (p *stubPool) Running() map[string]time.Time { return p.running }
+func (p *stubPool) AtQuota() bool { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{}) {}
+func (p *stubPool) Running() map[string]time.Time {
+ p.Lock()
+ defer p.Unlock()
+ r := map[string]time.Time{}
+ for k, v := range p.running {
+ r[k] = v
+ }
+ return r
+}
func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
+ p.Lock()
+ defer p.Unlock()
r := map[arvados.InstanceType]int{}
for it, n := range p.unalloc {
- r[it] = n
+ r[it] = n - p.unknown[it]
}
return r
}
-func (p *stubPool) Create(it arvados.InstanceType) error {
+func (p *stubPool) Create(it arvados.InstanceType) bool {
+ p.Lock()
+ defer p.Unlock()
p.creates = append(p.creates, it)
if p.canCreate < 1 {
- return stubQuotaError{errors.New("quota")}
+ return false
}
p.canCreate--
p.unalloc[it]++
- return nil
+ return true
}
-func (p *stubPool) KillContainer(uuid string) {
- p.running[uuid] = time.Now()
+func (p *stubPool) ForgetContainer(uuid string) {
+}
+func (p *stubPool) KillContainer(uuid, reason string) bool {
+ p.Lock()
+ defer p.Unlock()
+ defer delete(p.running, uuid)
+ t, ok := p.running[uuid]
+ return ok && t.IsZero()
}
func (p *stubPool) Shutdown(arvados.InstanceType) bool {
p.shutdowns++
return false
}
-func (p *stubPool) Workers() map[worker.State]int {
+func (p *stubPool) CountWorkers() map[worker.State]int {
+ p.Lock()
+ defer p.Unlock()
return map[worker.State]int{
worker.StateBooting: len(p.unalloc) - len(p.idle),
worker.StateIdle: len(p.idle),
worker.StateRunning: len(p.running),
+ worker.StateUnknown: len(p.unknown),
}
}
func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+ p.Lock()
+ defer p.Unlock()
p.starts = append(p.starts, ctr.UUID)
if p.idle[it] == 0 {
return false
return true
}
+func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
+ return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+}
+
var _ = check.Suite(&SchedulerSuite{})
type SchedulerSuite struct{}
// immediately. Don't try to create any other nodes after the failed
// create.
func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
queue := test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
- return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
- },
+ ChooseType: chooseType,
Containers: []arvados.Container{
{
UUID: test.ContainerUUID(1),
Priority: 1,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
{
UUID: test.ContainerUUID(2),
Priority: 2,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
{
UUID: test.ContainerUUID(3),
Priority: 3,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
{
UUID: test.ContainerUUID(4),
Priority: 4,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
test.InstanceType(2): 2,
},
running: map[string]time.Time{},
- canCreate: 1,
+ canCreate: 0,
}
- New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
- c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3)})
+ c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
for uuid := range pool.running {
c.Check(uuid, check.Equals, uuids[4])
}
}
-// Shutdown some nodes if Create() fails -- and without even calling
-// Create(), if AtQuota() is true.
+// If Create() fails, shutdown some nodes, and don't call Create()
+// again. Don't call Create() at all if AtQuota() is true.
func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
for quota := 0; quota < 2; quota++ {
+ c.Logf("quota=%d", quota)
shouldCreate := []arvados.InstanceType{}
- for i := 1; i < 1+quota; i++ {
- shouldCreate = append(shouldCreate, test.InstanceType(i))
+ for i := 0; i < quota; i++ {
+ shouldCreate = append(shouldCreate, test.InstanceType(3))
}
queue := test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
- return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
- },
+ ChooseType: chooseType,
Containers: []arvados.Container{
{
- UUID: test.ContainerUUID(1),
- Priority: 1,
- State: arvados.ContainerStateQueued,
+ UUID: test.ContainerUUID(2),
+ Priority: 2,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
- VCPUs: 1,
- RAM: 1 << 30,
+ VCPUs: 2,
+ RAM: 2 << 30,
+ },
+ },
+ {
+ UUID: test.ContainerUUID(3),
+ Priority: 3,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 3,
+ RAM: 3 << 30,
},
},
},
starts: []string{},
canCreate: 0,
}
- New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
c.Check(pool.starts, check.DeepEquals, []string{})
c.Check(pool.shutdowns, check.Not(check.Equals), 0)
// Start lower-priority containers while waiting for new/existing
// workers to come up for higher-priority containers.
func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
pool := stubPool{
unalloc: map[arvados.InstanceType]int{
- test.InstanceType(1): 1,
- test.InstanceType(2): 1,
+ test.InstanceType(1): 2,
+ test.InstanceType(2): 2,
},
idle: map[arvados.InstanceType]int{
test.InstanceType(1): 1,
test.InstanceType(2): 1,
},
running: map[string]time.Time{},
- canCreate: 2,
+ canCreate: 4,
}
queue := test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
- return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
- },
+ ChooseType: chooseType,
Containers: []arvados.Container{
{
// create a new worker
UUID: test.ContainerUUID(1),
Priority: 1,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
// tentatively map to unalloc worker
UUID: test.ContainerUUID(2),
Priority: 2,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
// start now on idle worker
UUID: test.ContainerUUID(3),
Priority: 3,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
// create a new worker
UUID: test.ContainerUUID(4),
Priority: 4,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
RAM: 2 << 30,
// tentatively map to unalloc worker
UUID: test.ContainerUUID(5),
Priority: 5,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
RAM: 2 << 30,
// start now on idle worker
UUID: test.ContainerUUID(6),
Priority: 6,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
RAM: 2 << 30,
},
}
queue.Update()
- New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
}
c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
}
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+ pool := stubPool{
+ unalloc: map[arvados.InstanceType]int{
+ test.InstanceType(2): 0,
+ },
+ idle: map[arvados.InstanceType]int{
+ test.InstanceType(2): 0,
+ },
+ running: map[string]time.Time{
+ test.ContainerUUID(2): time.Time{},
+ },
+ }
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Containers: []arvados.Container{
+ {
+ // create a new worker
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ },
+ }
+ queue.Update()
+ sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+ c.Check(pool.running, check.HasLen, 1)
+ sch.sync()
+ for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+ }
+ c.Check(pool.Running(), check.HasLen, 0)
+}