"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
// SetupVM, if set, is called upon creation of each new
// StubVM. This is the caller's opportunity to customize the
// VM's error rate and other behaviors.
- SetupVM func(*StubVM)
+ //
+ // If SetupVM returns an error, that error will be returned to
+ // the caller of Create(), and the new VM will be discarded.
+ SetupVM func(*StubVM) error
// Bugf, if set, is called if a bug is detected in the caller
// or stub. Typically set to (*check.C)Errorf. If unset,
MinTimeBetweenCreateCalls time.Duration
MinTimeBetweenInstancesCalls time.Duration
+ QuotaMaxInstances int
+
// If true, Create and Destroy calls block until Release() is
// called.
HoldCloudOps bool
}
// InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
if sd.holdCloudOps == nil {
sd.holdCloudOps = make(chan bool)
}
if math_rand.Float64() < sis.driver.ErrorRateCreate {
return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
}
+ if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
+ return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
+ }
sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
ak := sis.driver.AuthorizedKeys
if authKey != nil {
Exec: svm.Exec,
}
if setup := sis.driver.SetupVM; setup != nil {
- setup(svm)
+ err := setup(svm)
+ if err != nil {
+ return nil, err
+ }
}
sis.servers[svm.id] = svm
return svm.Instance(), nil
func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+type CapacityError struct{ InstanceTypeSpecific bool }
+
+func (e CapacityError) Error() string { return "insufficient capacity" }
+func (e CapacityError) IsCapacityError() bool { return true }
+func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
+
// StubVM is a fake server that runs an SSH service. It represents a
// VM running in a fake cloud.
//
killing map[string]bool
lastPID int64
deadlocked string
+ stubprocs sync.WaitGroup
+ destroying bool
sync.Mutex
}
}
func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ // Ensure we don't start any new stubprocs after Destroy()
+ // has started Wait()ing for stubprocs to end.
+ svm.Lock()
+ if svm.destroying {
+ svm.Unlock()
+ return 1
+ }
+ svm.stubprocs.Add(1)
+ defer svm.stubprocs.Done()
+ svm.Unlock()
+
stdinData, err := ioutil.ReadAll(stdin)
if err != nil {
fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
pid := svm.lastPID
svm.running[uuid] = stubProcess{pid: pid}
svm.Unlock()
+
time.Sleep(svm.CrunchRunDetachDelay)
+
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.destroying {
+ fmt.Fprint(stderr, "crunch-run: killed by system shutdown\n")
+ return 9
+ }
fmt.Fprintf(stderr, "starting %s\n", uuid)
logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"PID": pid,
})
logger.Printf("[test] starting crunch-run stub")
+ svm.stubprocs.Add(1)
go func() {
+ defer svm.stubprocs.Done()
var ctr arvados.Container
var started, completed bool
defer func() {
logger.Print("[test] exiting crunch-run stub")
svm.Lock()
defer svm.Unlock()
+ if svm.destroying {
+ return
+ }
if svm.running[uuid].pid != pid {
bugf := svm.sis.driver.Bugf
if bugf == nil {
svm.Lock()
killed := svm.killing[uuid]
+ delete(svm.killing, uuid)
+ destroying := svm.destroying
svm.Unlock()
- if killed || wantCrashEarly {
+ if killed || wantCrashEarly || destroying {
return
}
if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
return errors.New("instance could not be destroyed")
}
+ si.svm.Lock()
+ si.svm.destroying = true
+ si.svm.Unlock()
+ si.svm.stubprocs.Wait()
si.svm.SSHService.Close()
sis.mtx.Lock()
defer sis.mtx.Unlock()
func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
return nil
}
+
+type QuotaError struct {
+ error
+}
+
+func (QuotaError) IsQuotaError() bool { return true }