"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
// SetupVM, if set, is called upon creation of each new
// StubVM. This is the caller's opportunity to customize the
// VM's error rate and other behaviors.
- SetupVM func(*StubVM)
+ //
+ // If SetupVM returns an error, that error will be returned to
+ // the caller of Create(), and the new VM will be discarded.
+ SetupVM func(*StubVM) error
+
+ // Bugf, if set, is called if a bug is detected in the caller
+ // or stub. Typically set to (*check.C)Errorf. If unset,
+ // logger.Warnf is called instead.
+ Bugf func(string, ...interface{})
// StubVM's fake crunch-run uses this Queue to read and update
// container state.
Queue *Queue
// Frequency of artificially introduced errors on calls to
- // Destroy. 0=always succeed, 1=always fail.
+ // Create and Destroy. 0=always succeed, 1=always fail.
+ ErrorRateCreate float64
ErrorRateDestroy float64
// If Create() or Instances() is called too frequently, return
MinTimeBetweenCreateCalls time.Duration
MinTimeBetweenInstancesCalls time.Duration
+ QuotaMaxInstances int
+
// If true, Create and Destroy calls block until Release() is
// called.
HoldCloudOps bool
}
// InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
if sd.holdCloudOps == nil {
sd.holdCloudOps = make(chan bool)
}
lastInstanceID int
}
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
if sis.driver.HoldCloudOps {
sis.driver.holdCloudOps <- true
}
}
if sis.allowCreateCall.After(time.Now()) {
return nil, RateLimitError{sis.allowCreateCall}
- } else {
- sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
}
-
+ if math_rand.Float64() < sis.driver.ErrorRateCreate {
+ return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
+ }
+ if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
+ return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
+ }
+ sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
ak := sis.driver.AuthorizedKeys
if authKey != nil {
ak = append([]ssh.PublicKey{authKey}, ak...)
}
sis.lastInstanceID++
svm := &StubVM{
+ InitCommand: initCommand,
sis: sis,
id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
tags: copyTags(tags),
providerType: it.ProviderType,
- initCommand: cmd,
- running: map[string]int64{},
+ running: map[string]stubProcess{},
killing: map[string]bool{},
}
svm.SSHService = SSHService{
Exec: svm.Exec,
}
if setup := sis.driver.SetupVM; setup != nil {
- setup(svm)
+ err := setup(svm)
+ if err != nil {
+ return nil, err
+ }
}
sis.servers[svm.id] = svm
return svm.Instance(), nil
defer sis.mtx.RUnlock()
if sis.allowInstancesCall.After(time.Now()) {
return nil, RateLimitError{sis.allowInstancesCall}
- } else {
- sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
}
+ sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
var r []cloud.Instance
for _, ss := range sis.servers {
r = append(r, ss.Instance())
sis.stopped = true
}
+func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
+ sis.mtx.Lock()
+ defer sis.mtx.Unlock()
+ for _, vm := range sis.servers {
+ svms = append(svms, vm)
+ }
+ return
+}
+
type RateLimitError struct{ Retry time.Time }
func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+type CapacityError struct{ InstanceTypeSpecific bool }
+
+func (e CapacityError) Error() string { return "insufficient capacity" }
+func (e CapacityError) IsCapacityError() bool { return true }
+func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
+
// StubVM is a fake server that runs an SSH service. It represents a
// VM running in a fake cloud.
//
CrunchRunMissing bool
CrunchRunCrashRate float64
CrunchRunDetachDelay time.Duration
+ ArvMountMaxExitLag time.Duration
+ ArvMountDeadlockRate float64
ExecuteContainer func(arvados.Container) int
CrashRunningContainer func(arvados.Container)
+ ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
+
+ // Populated by (*StubInstanceSet)Create()
+ InitCommand cloud.InitCommand
sis *StubInstanceSet
id cloud.InstanceID
tags cloud.InstanceTags
- initCommand cloud.InitCommand
providerType string
SSHService SSHService
- running map[string]int64
+ running map[string]stubProcess
killing map[string]bool
lastPID int64
+ deadlocked string
+ stubprocs sync.WaitGroup
+ destroying bool
sync.Mutex
}
+type stubProcess struct {
+ pid int64
+
+ // crunch-run has exited, but arv-mount process (or something)
+ // still holds lock in /var/run/
+ exited bool
+}
+
func (svm *StubVM) Instance() stubInstance {
svm.Lock()
defer svm.Unlock()
}
func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ // Ensure we don't start any new stubprocs after Destroy()
+ // has started Wait()ing for stubprocs to end.
+ svm.Lock()
+ if svm.destroying {
+ svm.Unlock()
+ return 1
+ }
+ svm.stubprocs.Add(1)
+ defer svm.stubprocs.Done()
+ svm.Unlock()
+
stdinData, err := ioutil.ReadAll(stdin)
if err != nil {
fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
- var stdinKV map[string]string
- err := json.Unmarshal(stdinData, &stdinKV)
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+ var configData crunchrun.ConfigData
+ err := json.Unmarshal(stdinData, &configData)
if err != nil {
fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
return 1
}
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if stdinKV[name] == "" {
+ if configData.Env[name] == "" {
fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
return 1
}
svm.Lock()
svm.lastPID++
pid := svm.lastPID
- svm.running[uuid] = pid
+ svm.running[uuid] = stubProcess{pid: pid}
svm.Unlock()
+
time.Sleep(svm.CrunchRunDetachDelay)
+
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.destroying {
+ fmt.Fprint(stderr, "crunch-run: killed by system shutdown\n")
+ return 9
+ }
fmt.Fprintf(stderr, "starting %s\n", uuid)
logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"PID": pid,
})
logger.Printf("[test] starting crunch-run stub")
+ svm.stubprocs.Add(1)
go func() {
+ defer svm.stubprocs.Done()
+ var ctr arvados.Container
+ var started, completed bool
+ defer func() {
+ logger.Print("[test] exiting crunch-run stub")
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.destroying {
+ return
+ }
+ if svm.running[uuid].pid != pid {
+ bugf := svm.sis.driver.Bugf
+ if bugf == nil {
+ bugf = logger.Warnf
+ }
+ bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
+ return
+ }
+ if !completed {
+ logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
+ if started && svm.CrashRunningContainer != nil {
+ svm.CrashRunningContainer(ctr)
+ }
+ }
+ sproc := svm.running[uuid]
+ sproc.exited = true
+ svm.running[uuid] = sproc
+ svm.Unlock()
+ time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
+ svm.Lock()
+ if math_rand.Float64() >= svm.ArvMountDeadlockRate {
+ delete(svm.running, uuid)
+ }
+ }()
+
crashluck := math_rand.Float64()
+ wantCrash := crashluck < svm.CrunchRunCrashRate
+ wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
+
ctr, ok := queue.Get(uuid)
if !ok {
logger.Print("[test] container not in queue")
return
}
- defer func() {
- if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
- svm.CrashRunningContainer(ctr)
- }
- }()
-
- if crashluck > svm.CrunchRunCrashRate/2 {
- time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
- ctr.State = arvados.ContainerStateRunning
- if !queue.Notify(ctr) {
- ctr, _ = queue.Get(uuid)
- logger.Print("[test] erroring out because state=Running update was rejected")
- return
- }
- }
-
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
svm.Lock()
- defer svm.Unlock()
- if svm.running[uuid] != pid {
- logger.Print("[test] container was killed")
+ killed := svm.killing[uuid]
+ delete(svm.killing, uuid)
+ destroying := svm.destroying
+ svm.Unlock()
+ if killed || wantCrashEarly || destroying {
+ return
+ }
+
+ ctr.State = arvados.ContainerStateRunning
+ started = queue.Notify(ctr)
+ if !started {
+ ctr, _ = queue.Get(uuid)
+ logger.Print("[test] erroring out because state=Running update was rejected")
return
}
- delete(svm.running, uuid)
- if crashluck < svm.CrunchRunCrashRate {
+ if wantCrash {
logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
- } else {
- if svm.ExecuteContainer != nil {
- ctr.ExitCode = svm.ExecuteContainer(ctr)
- }
- logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
- ctr.State = arvados.ContainerStateComplete
- go queue.Notify(ctr)
+ return
}
+ if svm.ExecuteContainer != nil {
+ ctr.ExitCode = svm.ExecuteContainer(ctr)
+ }
+ logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
+ ctr.State = arvados.ContainerStateComplete
+ completed = queue.Notify(ctr)
}()
return 0
}
if command == "crunch-run --list" {
svm.Lock()
defer svm.Unlock()
- for uuid := range svm.running {
- fmt.Fprintf(stdout, "%s\n", uuid)
+ for uuid, sproc := range svm.running {
+ if sproc.exited {
+ fmt.Fprintf(stdout, "%s stale\n", uuid)
+ } else {
+ fmt.Fprintf(stdout, "%s\n", uuid)
+ }
}
if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
fmt.Fprintln(stdout, "broken")
}
+ fmt.Fprintln(stdout, svm.deadlocked)
return 0
}
if strings.HasPrefix(command, "crunch-run --kill ") {
svm.Lock()
- pid, running := svm.running[uuid]
- if running && !svm.killing[uuid] {
+ sproc, running := svm.running[uuid]
+ if running && !sproc.exited {
svm.killing[uuid] = true
- go func() {
- time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
- svm.Lock()
- defer svm.Unlock()
- if svm.running[uuid] == pid {
- // Kill only if the running entry
- // hasn't since been killed and
- // replaced with a different one.
- delete(svm.running, uuid)
- }
- delete(svm.killing, uuid)
- }()
svm.Unlock()
time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
svm.Lock()
- _, running = svm.running[uuid]
+ sproc, running = svm.running[uuid]
}
svm.Unlock()
- if running {
+ if running && !sproc.exited {
fmt.Fprintf(stderr, "%s: container is running\n", uuid)
return 1
- } else {
- fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
- return 0
}
+ fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+ return 0
}
if command == "true" {
return 0
if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
return errors.New("instance could not be destroyed")
}
+ si.svm.Lock()
+ si.svm.destroying = true
+ si.svm.Unlock()
+ si.svm.stubprocs.Wait()
si.svm.SSHService.Close()
sis.mtx.Lock()
defer sis.mtx.Unlock()
}
return dst
}
+
+func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
+ return nil
+}
+
+type QuotaError struct {
+ error
+}
+
+func (QuotaError) IsQuotaError() bool { return true }