notify <-chan struct{}
unalloc map[arvados.InstanceType]int // idle+booting+unknown
idle map[arvados.InstanceType]int
+ unknown map[arvados.InstanceType]int
running map[string]time.Time
atQuota bool
canCreate int
defer p.Unlock()
r := map[arvados.InstanceType]int{}
for it, n := range p.unalloc {
- r[it] = n
+ r[it] = n - p.unknown[it]
}
return r
}
worker.StateBooting: len(p.unalloc) - len(p.idle),
worker.StateIdle: len(p.idle),
worker.StateRunning: len(p.running),
+ worker.StateUnknown: len(p.unknown),
}
}
func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
"fmt"
"git.arvados.org/arvados.git/lib/dispatchcloud/container"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
)
// Running containers whose crunch-run processes have exited are
// cancelled.
func (sch *Scheduler) sync() {
+ anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
running := sch.pool.Running()
qEntries, qUpdated := sch.queue.Entries()
for uuid, ent := range qEntries {
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- go sch.cancel(uuid, "not running on any worker")
+ if !anyUnknownWorkers {
+ go sch.cancel(uuid, "not running on any worker")
+ }
} else if !exited.IsZero() && qUpdated.After(exited) {
go sch.cancel(uuid, "state=Running after crunch-run exited")
} else if ent.Container.Priority == 0 {
ents, _ = queue.Entries()
c.Check(ents, check.HasLen, 0)
}
+
+func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+ pool := stubPool{
+ unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+ unknown: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+ }
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Containers: []arvados.Container{
+ {
+ UUID: test.ContainerUUID(1),
+ Priority: 0,
+ State: arvados.ContainerStateRunning,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ },
+ }
+ queue.Update()
+
+ ents, _ := queue.Entries()
+ c.Check(ents, check.HasLen, 1)
+
+ sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+
+ // Sync shouldn't cancel the container because it might be
+ // running on the VM with state=="unknown".
+ //
+ // (Cancel+forget happens asynchronously and requires multiple
+ // sync() calls, so even after 10x sync-and-sleep iterations,
+ // we aren't 100% confident that sync isn't trying to
+ // cancel. But in the test environment, the goroutines started
+ // by sync() access stubs and therefore run quickly, so it
+ // works fine in practice. We accept that if the code is
+ // broken, the test will still pass occasionally.)
+ for i := 0; i < 10; i++ {
+ sch.sync()
+ time.Sleep(time.Millisecond)
+ }
+ ents, _ = queue.Entries()
+ c.Check(ents, check.HasLen, 1)
+ c.Check(ents[test.ContainerUUID(1)].Container.State, check.Equals, arvados.ContainerStateRunning)
+
+ // Sync should cancel & forget the container when the
+ // "unknown" node goes away.
+ //
+ // (As above, cancel+forget is async and requires multiple
+ // sync() calls, but stubs are fast so in practice this takes
+ // much less than 1s to complete.)
+ pool.unknown = nil
+ for deadline := time.Now().Add(time.Second); ; time.Sleep(time.Millisecond) {
+ sch.sync()
+ ents, _ = queue.Entries()
+ if len(ents) == 0 || time.Now().After(deadline) {
+ break
+ }
+ }
+ c.Check(ents, check.HasLen, 0)
+}