ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
return ChooseInstanceType(s.cluster, ctr)
},
+ Logger: ctxlog.TestLogger(c),
}
for i := 0; i < 200; i++ {
queue.Containers = append(queue.Containers, arvados.Container{
stubvm.CrunchRunCrashRate = 0.1
}
}
+ s.stubDriver.Bugf = c.Errorf
start := time.Now()
go s.disp.run()
time.Sleep(time.Millisecond)
}
c.Assert(len(sr.Items), check.Equals, 1)
- c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
+ c.Check(sr.Items[0].Instance, check.Matches, "inst.*")
c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
c.Check(sr.Items[0].Price, check.Equals, 0.123)
c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
// a higher-priority container on the
// same instance type. Don't let this
// one sneak in ahead of it.
+ } else if sch.pool.KillContainer(ctr.UUID, "about to lock") {
+ logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
} else if sch.pool.StartContainer(it, ctr) {
// Success.
} else {
func (p *stubPool) KillContainer(uuid, reason string) bool {
p.Lock()
defer p.Unlock()
- delete(p.running, uuid)
- return true
+ defer delete(p.running, uuid)
+ t, ok := p.running[uuid]
+ return ok && t.IsZero()
}
func (p *stubPool) Shutdown(arvados.InstanceType) bool {
p.shutdowns++
}
func (sch *Scheduler) kill(uuid string, reason string) {
+ if !sch.uuidLock(uuid, "kill") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
sch.pool.KillContainer(uuid, reason)
sch.pool.ForgetContainer(uuid)
}
func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
uuid := ent.Container.UUID
- if !sch.uuidLock(uuid, "cancel") {
+ if !sch.uuidLock(uuid, "requeue") {
return
}
defer sch.uuidUnlock(uuid)
"git.arvados.org/arvados.git/lib/dispatchcloud/container"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
// Queue is a test stub for container.Queue. The caller specifies the
// must not be nil.
ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+ Logger logrus.FieldLogger
+
entries map[string]container.QueueEnt
updTime time.Time
subscribers map[<-chan struct{}]chan struct{}
defer q.mtx.Unlock()
for i, ctr := range q.Containers {
if ctr.UUID == upd.UUID {
- if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+ if allowContainerUpdate[ctr.State][upd.State] {
q.Containers[i] = upd
return true
+ } else {
+ if q.Logger != nil {
+ q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
+ }
+ return false
}
- return false
}
}
q.Containers = append(q.Containers, upd)
return true
}
+
+var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
+ arvados.ContainerStateQueued: map[arvados.ContainerState]bool{
+ arvados.ContainerStateQueued: true,
+ arvados.ContainerStateLocked: true,
+ arvados.ContainerStateCancelled: true,
+ },
+ arvados.ContainerStateLocked: map[arvados.ContainerState]bool{
+ arvados.ContainerStateQueued: true,
+ arvados.ContainerStateLocked: true,
+ arvados.ContainerStateRunning: true,
+ arvados.ContainerStateCancelled: true,
+ },
+ arvados.ContainerStateRunning: map[arvados.ContainerState]bool{
+ arvados.ContainerStateRunning: true,
+ arvados.ContainerStateCancelled: true,
+ arvados.ContainerStateComplete: true,
+ },
+}
// VM's error rate and other behaviors.
SetupVM func(*StubVM)
+ // Bugf, if set, is called if a bug is detected in the caller
+ // or stub. Typically set to (*check.C)Errorf. If unset,
+ // logger.Warnf is called instead.
+ Bugf func(string, ...interface{})
+
// StubVM's fake crunch-run uses this Queue to read and update
// container state.
Queue *Queue
allowCreateCall time.Time
allowInstancesCall time.Time
+ lastInstanceID int
}
func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
if authKey != nil {
ak = append([]ssh.PublicKey{authKey}, ak...)
}
+ sis.lastInstanceID++
svm := &StubVM{
sis: sis,
- id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
+ id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
tags: copyTags(tags),
providerType: it.ProviderType,
initCommand: cmd,
})
logger.Printf("[test] starting crunch-run stub")
go func() {
+ var ctr arvados.Container
+ var started, completed bool
+ defer func() {
+ logger.Print("[test] exiting crunch-run stub")
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.running[uuid] != pid {
+ if !completed {
+ bugf := svm.sis.driver.Bugf
+ if bugf == nil {
+ bugf = logger.Warnf
+ }
+ bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
+ }
+ } else {
+ delete(svm.running, uuid)
+ }
+ if !completed {
+ logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
+ if started && svm.CrashRunningContainer != nil {
+ svm.CrashRunningContainer(ctr)
+ }
+ }
+ }()
+
crashluck := math_rand.Float64()
+ wantCrash := crashluck < svm.CrunchRunCrashRate
+ wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
+
ctr, ok := queue.Get(uuid)
if !ok {
logger.Print("[test] container not in queue")
return
}
- defer func() {
- if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
- svm.CrashRunningContainer(ctr)
- }
- }()
-
- if crashluck > svm.CrunchRunCrashRate/2 {
- time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
- ctr.State = arvados.ContainerStateRunning
- if !queue.Notify(ctr) {
- ctr, _ = queue.Get(uuid)
- logger.Print("[test] erroring out because state=Running update was rejected")
- return
- }
- }
-
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
svm.Lock()
- defer svm.Unlock()
- if svm.running[uuid] != pid {
- logger.Print("[test] container was killed")
+ killed := svm.running[uuid] != pid
+ svm.Unlock()
+ if killed || wantCrashEarly {
return
}
- delete(svm.running, uuid)
- if crashluck < svm.CrunchRunCrashRate {
+ ctr.State = arvados.ContainerStateRunning
+ started = queue.Notify(ctr)
+ if !started {
+ ctr, _ = queue.Get(uuid)
+ logger.Print("[test] erroring out because state=Running update was rejected")
+ return
+ }
+
+ if wantCrash {
logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
- } else {
- if svm.ExecuteContainer != nil {
- ctr.ExitCode = svm.ExecuteContainer(ctr)
- }
- logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
- ctr.State = arvados.ContainerStateComplete
- go queue.Notify(ctr)
+ return
+ }
+ if svm.ExecuteContainer != nil {
+ ctr.ExitCode = svm.ExecuteContainer(ctr)
}
+ logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
+ ctr.State = arvados.ContainerStateComplete
+ completed = queue.Notify(ctr)
}()
return 0
}