import (
"context"
+ "crypto/tls"
"encoding/json"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
+ "net/url"
"os"
"sync"
"time"
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
- ctx context.Context
- cancel context.CancelFunc
- cluster *arvados.Cluster
- stubDriver *test.StubDriver
- disp *dispatcher
+ ctx context.Context
+ cancel context.CancelFunc
+ cluster *arvados.Cluster
+ stubDriver *test.StubDriver
+ disp *dispatcher
+ error503Server *httptest.Server
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
s.stubDriver = &test.StubDriver{
HostKey: hostpriv,
AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateCreate: 0.1,
ErrorRateDestroy: 0.1,
MinTimeBetweenCreateCalls: time.Millisecond,
}
DispatchPrivateKey: string(dispatchprivraw),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
RuntimeEngine: "stub",
+ MaxDispatchAttempts: 10,
CloudVMs: arvados.CloudVMsConfig{
Driver: "test",
SyncInterval: arvados.Duration(10 * time.Millisecond),
TimeoutProbe: arvados.Duration(15 * time.Millisecond),
TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
MaxCloudOpsPerSecond: 500,
+ InitialQuotaEstimate: 8,
PollInterval: arvados.Duration(5 * time.Millisecond),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/")
arvClient, err := arvados.NewClientFromConfig(s.cluster)
- c.Check(err, check.IsNil)
+ c.Assert(err, check.IsNil)
+ // Disable auto-retry
+ arvClient.Timeout = 0
+
+ s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ c.Logf("503 stub: returning 503")
+ w.WriteHeader(http.StatusServiceUnavailable)
+ }))
+ arvClient.Client = &http.Client{
+ Transport: &http.Transport{
+ Proxy: s.arvClientProxy(c),
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}}
s.disp = &dispatcher{
Cluster: s.cluster,
func (s *DispatcherSuite) TearDownTest(c *check.C) {
s.cancel()
s.disp.Close()
+ s.error503Server.Close()
+}
+
+// Intercept outgoing API requests for "/503" and respond HTTP
+// 503. This lets us force (*arvados.Client)Last503() to return
+// something.
+func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
+ return func(req *http.Request) (*url.URL, error) {
+ if req.URL.Path == "/503" {
+ c.Logf("arvClientProxy: proxying to 503 stub")
+ return url.Parse(s.error503Server.URL)
+ } else {
+ return nil, nil
+ }
+ }
}
// DispatchToStubDriver checks that the dispatcher wires everything
Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
queue := &test.Queue{
+ MaxDispatchAttempts: 5,
ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
return ChooseInstanceType(s.cluster, ctr)
},
return
}
delete(waiting, ctr.UUID)
+ if len(waiting) == 100 {
+ // trigger scheduler maxConcurrency limit
+ c.Logf("test: requesting 503 in order to trigger maxConcurrency limit")
+ s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
+ }
if len(waiting) == 0 {
close(done)
}
select {
case <-done:
// loop will end because len(waiting)==0
- case <-time.After(3 * time.Second):
+ case <-time.After(5 * time.Second):
if len(waiting) >= waswaiting {
- c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+ c.Fatalf("timed out; no progress in 5 s while waiting for %d containers: %q", len(waiting), waiting)
}
}
}
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
- c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} 0.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} [0-9]+\n.*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="disappeared"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="failure"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="success"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="success"} [0-9e+.]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="fail"} [0-9]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="fail"} [0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*last_503_time [1-9][0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
}
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {