"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
// SetupVM, if set, is called upon creation of each new
// StubVM. This is the caller's opportunity to customize the
// VM's error rate and other behaviors.
- SetupVM func(*StubVM)
+ //
+ // If SetupVM returns an error, that error will be returned to
+ // the caller of Create(), and the new VM will be discarded.
+ SetupVM func(*StubVM) error
// Bugf, if set, is called if a bug is detected in the caller
// or stub. Typically set to (*check.C)Errorf. If unset,
Queue *Queue
// Frequency of artificially introduced errors on calls to
- // Destroy. 0=always succeed, 1=always fail.
+ // Create and Destroy. 0=always succeed, 1=always fail.
+ ErrorRateCreate float64
ErrorRateDestroy float64
// If Create() or Instances() is called too frequently, return
MinTimeBetweenCreateCalls time.Duration
MinTimeBetweenInstancesCalls time.Duration
+ QuotaMaxInstances int
+
// If true, Create and Destroy calls block until Release() is
// called.
HoldCloudOps bool
}
// InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
if sd.holdCloudOps == nil {
sd.holdCloudOps = make(chan bool)
}
lastInstanceID int
}
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
if sis.driver.HoldCloudOps {
sis.driver.holdCloudOps <- true
}
if sis.allowCreateCall.After(time.Now()) {
return nil, RateLimitError{sis.allowCreateCall}
}
+ if math_rand.Float64() < sis.driver.ErrorRateCreate {
+ return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
+ }
+ if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
+ return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
+ }
sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
ak := sis.driver.AuthorizedKeys
if authKey != nil {
}
sis.lastInstanceID++
svm := &StubVM{
+ InitCommand: initCommand,
sis: sis,
id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
tags: copyTags(tags),
providerType: it.ProviderType,
- initCommand: cmd,
running: map[string]stubProcess{},
killing: map[string]bool{},
}
Exec: svm.Exec,
}
if setup := sis.driver.SetupVM; setup != nil {
- setup(svm)
+ err := setup(svm)
+ if err != nil {
+ return nil, err
+ }
}
sis.servers[svm.id] = svm
return svm.Instance(), nil
sis.stopped = true
}
+func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
+ sis.mtx.Lock()
+ defer sis.mtx.Unlock()
+ for _, vm := range sis.servers {
+ svms = append(svms, vm)
+ }
+ return
+}
+
type RateLimitError struct{ Retry time.Time }
func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+type CapacityError struct{ InstanceTypeSpecific bool }
+
+func (e CapacityError) Error() string { return "insufficient capacity" }
+func (e CapacityError) IsCapacityError() bool { return true }
+func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
+
// StubVM is a fake server that runs an SSH service. It represents a
// VM running in a fake cloud.
//
ArvMountDeadlockRate float64
ExecuteContainer func(arvados.Container) int
CrashRunningContainer func(arvados.Container)
+ ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
+
+ // Populated by (*StubInstanceSet)Create()
+ InitCommand cloud.InitCommand
sis *StubInstanceSet
id cloud.InstanceID
tags cloud.InstanceTags
- initCommand cloud.InitCommand
providerType string
SSHService SSHService
running map[string]stubProcess
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
- var stdinKV map[string]string
- err := json.Unmarshal(stdinData, &stdinKV)
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+ var configData crunchrun.ConfigData
+ err := json.Unmarshal(stdinData, &configData)
if err != nil {
fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
return 1
}
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if stdinKV[name] == "" {
+ if configData.Env[name] == "" {
fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
return 1
}
}
return dst
}
+
+func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
+ return nil
+}
+
+type QuotaError struct {
+ error
+}
+
+func (QuotaError) IsQuotaError() bool { return true }