import (
"context"
+ "crypto/tls"
"encoding/json"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
+ "net/url"
"os"
"sync"
+ "sync/atomic"
"time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
- ctx context.Context
- cancel context.CancelFunc
- cluster *arvados.Cluster
- stubDriver *test.StubDriver
- disp *dispatcher
+ ctx context.Context
+ cancel context.CancelFunc
+ cluster *arvados.Cluster
+ stubDriver *test.StubDriver
+ disp *dispatcher
+ error503Server *httptest.Server
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
s.stubDriver = &test.StubDriver{
HostKey: hostpriv,
AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateCreate: 0.1,
ErrorRateDestroy: 0.1,
MinTimeBetweenCreateCalls: time.Millisecond,
+ QuotaMaxInstances: 10,
}
+ // We need the postgresql connection info from the integration
+ // test config.
+ cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load()
+ c.Assert(err, check.IsNil)
+ testcluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+
s.cluster = &arvados.Cluster{
ManagementToken: "test-management-token",
+ PostgreSQL: testcluster.PostgreSQL,
Containers: arvados.ContainersConfig{
- DispatchPrivateKey: string(dispatchprivraw),
- StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
+ CrunchRunCommand: "crunch-run",
+ CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
+ DispatchPrivateKey: string(dispatchprivraw),
+ StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
+ RuntimeEngine: "stub",
+ MaxDispatchAttempts: 10,
+ MaximumPriceFactor: 1.5,
CloudVMs: arvados.CloudVMsConfig{
Driver: "test",
SyncInterval: arvados.Duration(10 * time.Millisecond),
TimeoutProbe: arvados.Duration(15 * time.Millisecond),
TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
MaxCloudOpsPerSecond: 500,
+ InitialQuotaEstimate: 8,
PollInterval: arvados.Duration(5 * time.Millisecond),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/")
arvClient, err := arvados.NewClientFromConfig(s.cluster)
- c.Check(err, check.IsNil)
+ c.Assert(err, check.IsNil)
+ // Disable auto-retry
+ arvClient.Timeout = 0
+
+ s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ c.Logf("503 stub: returning 503")
+ w.WriteHeader(http.StatusServiceUnavailable)
+ }))
+ arvClient.Client = &http.Client{
+ Transport: &http.Transport{
+ Proxy: s.arvClientProxy(c),
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}}
s.disp = &dispatcher{
Cluster: s.cluster,
func (s *DispatcherSuite) TearDownTest(c *check.C) {
s.cancel()
s.disp.Close()
+ s.error503Server.Close()
+}
+
+// Intercept outgoing API requests for "/503" and respond HTTP
+// 503. This lets us force (*arvados.Client)Last503() to return
+// something.
+func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
+ return func(req *http.Request) (*url.URL, error) {
+ if req.URL.Path == "/503" {
+ c.Logf("arvClientProxy: proxying to 503 stub")
+ return url.Parse(s.error503Server.URL)
+ } else {
+ return nil, nil
+ }
+ }
}
// DispatchToStubDriver checks that the dispatcher wires everything
Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
queue := &test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ MaxDispatchAttempts: 5,
+ ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
return ChooseInstanceType(s.cluster, ctr)
},
Logger: ctxlog.TestLogger(c),
return
}
delete(waiting, ctr.UUID)
+ if len(waiting) == 100 {
+ // trigger scheduler maxConcurrency limit
+ c.Logf("test: requesting 503 in order to trigger maxConcurrency limit")
+ s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
+ }
if len(waiting) == 0 {
close(done)
}
finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
+ var countCapacityErrors int64
n := 0
s.stubDriver.Queue = queue
- s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+ s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+ if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType {
+ c.Logf("test: returning capacity error for instance type %s", pt)
+ atomic.AddInt64(&countCapacityErrors, 1)
+ return test.CapacityError{InstanceTypeSpecific: true}
+ }
n++
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.ExecuteContainer = executeContainer
stubvm.CrashRunningContainer = finishContainer
- switch n % 7 {
- case 0:
+ stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''"
+ switch {
+ case n%7 == 0:
+ // some instances start out OK but then stop
+ // running any commands
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
- case 1:
+ case n%7 == 1:
+ // some instances never pass a run-probe
stubvm.CrunchRunMissing = true
- case 2:
+ case n%7 == 2:
+ // some instances start out OK but then start
+ // reporting themselves as broken
stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
+ case n == 3:
+ // 1 instance is completely broken, ensuring
+ // the boot_outcomes{outcome="failure"} metric
+ // is not zero
+ stubvm.CrunchRunCrashRate = 1
default:
stubvm.CrunchRunCrashRate = 0.1
stubvm.ArvMountDeadlockRate = 0.1
}
+ return nil
}
s.stubDriver.Bugf = c.Errorf
err := s.disp.CheckHealth()
c.Check(err, check.IsNil)
- select {
- case <-done:
- c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
- case <-time.After(10 * time.Second):
- c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+ for len(waiting) > 0 {
+ waswaiting := len(waiting)
+ select {
+ case <-done:
+ // loop will end because len(waiting)==0
+ case <-time.After(5 * time.Second):
+ if len(waiting) >= waswaiting {
+ c.Fatalf("timed out; no progress in 5 s while waiting for %d containers: %q", len(waiting), waiting)
+ }
+ }
}
+ c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
}
}
+ c.Check(countCapacityErrors, check.Not(check.Equals), int64(0))
+
req := httptest.NewRequest("GET", "/metrics", nil)
req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
resp := httptest.NewRecorder()
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
- c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} 0.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} [0-9]+\n.*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="disappeared"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="failure"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="success"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="success"} [0-9e+.]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="fail"} [0-9]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="fail"} [0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*last_503_time [1-9][0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
}
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
sr := getInstances()
c.Check(len(sr.Items), check.Equals, 0)
+ s.stubDriver.ErrorRateCreate = 0
ch := s.disp.pool.Subscribe()
defer s.disp.pool.Unsubscribe(ch)
ok := s.disp.pool.Create(test.InstanceType(1))