"net/url"
"os"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/lib/config"
s.stubDriver = &test.StubDriver{
HostKey: hostpriv,
AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateCreate: 0.1,
ErrorRateDestroy: 0.1,
MinTimeBetweenCreateCalls: time.Millisecond,
+ QuotaMaxInstances: 10,
}
// We need the postgresql connection info from the integration
DispatchPrivateKey: string(dispatchprivraw),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
RuntimeEngine: "stub",
+ MaxDispatchAttempts: 10,
+ MaximumPriceFactor: 1.5,
CloudVMs: arvados.CloudVMsConfig{
Driver: "test",
SyncInterval: arvados.Duration(10 * time.Millisecond),
TimeoutProbe: arvados.Duration(15 * time.Millisecond),
TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
MaxCloudOpsPerSecond: 500,
+ InitialQuotaEstimate: 8,
PollInterval: arvados.Duration(5 * time.Millisecond),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
// Disable auto-retry
arvClient.Timeout = 0
- s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) }))
+ s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ c.Logf("503 stub: returning 503")
+ w.WriteHeader(http.StatusServiceUnavailable)
+ }))
arvClient.Client = &http.Client{
Transport: &http.Transport{
Proxy: s.arvClientProxy(c),
func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
return func(req *http.Request) (*url.URL, error) {
if req.URL.Path == "/503" {
+ c.Logf("arvClientProxy: proxying to 503 stub")
return url.Parse(s.error503Server.URL)
} else {
return nil, nil
Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
queue := &test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ MaxDispatchAttempts: 5,
+ ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
return ChooseInstanceType(s.cluster, ctr)
},
Logger: ctxlog.TestLogger(c),
delete(waiting, ctr.UUID)
if len(waiting) == 100 {
// trigger scheduler maxConcurrency limit
+ c.Logf("test: requesting 503 in order to trigger maxConcurrency limit")
s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
}
if len(waiting) == 0 {
finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
+ var countCapacityErrors int64
n := 0
s.stubDriver.Queue = queue
- s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+ s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+ if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType {
+ c.Logf("test: returning capacity error for instance type %s", pt)
+ atomic.AddInt64(&countCapacityErrors, 1)
+ return test.CapacityError{InstanceTypeSpecific: true}
+ }
n++
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.ExecuteContainer = executeContainer
stubvm.CrashRunningContainer = finishContainer
stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''"
- switch n % 7 {
- case 0:
+ switch {
+ case n%7 == 0:
+ // some instances start out OK but then stop
+ // running any commands
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
- case 1:
+ case n%7 == 1:
+ // some instances never pass a run-probe
stubvm.CrunchRunMissing = true
- case 2:
+ case n%7 == 2:
+ // some instances start out OK but then start
+ // reporting themselves as broken
stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
+ case n == 3:
+ // 1 instance is completely broken, ensuring
+ // the boot_outcomes{outcome="failure"} metric
+ // is not zero
+ stubvm.CrunchRunCrashRate = 1
default:
stubvm.CrunchRunCrashRate = 0.1
stubvm.ArvMountDeadlockRate = 0.1
}
+ return nil
}
s.stubDriver.Bugf = c.Errorf
select {
case <-done:
// loop will end because len(waiting)==0
- case <-time.After(3 * time.Second):
+ case <-time.After(5 * time.Second):
if len(waiting) >= waswaiting {
- c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+ c.Fatalf("timed out; no progress in 5 s while waiting for %d containers: %q", len(waiting), waiting)
}
}
}
}
}
+ c.Check(countCapacityErrors, check.Not(check.Equals), int64(0))
+
req := httptest.NewRequest("GET", "/metrics", nil)
req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
resp := httptest.NewRecorder()
sr := getInstances()
c.Check(len(sr.Items), check.Equals, 0)
+ s.stubDriver.ErrorRateCreate = 0
ch := s.disp.pool.Subscribe()
defer s.disp.pool.Unsubscribe(ch)
ok := s.disp.pool.Create(test.InstanceType(1))