package dispatchcloud
import (
+ "context"
"encoding/json"
"io/ioutil"
"math/rand"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
+ ctx context.Context
+ cancel context.CancelFunc
cluster *arvados.Cluster
stubDriver *test.StubDriver
disp *dispatcher
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
c.Assert(err, check.IsNil)
}
s.cluster = &arvados.Cluster{
- CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(150 * time.Millisecond),
- TimeoutBooting: arvados.Duration(150 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
- },
- Dispatch: arvados.Dispatch{
- PrivateKey: dispatchprivraw,
- PollInterval: arvados.Duration(5 * time.Millisecond),
- ProbeInterval: arvados.Duration(5 * time.Millisecond),
+ ManagementToken: "test-management-token",
+ Containers: arvados.ContainersConfig{
+ DispatchPrivateKey: string(dispatchprivraw),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
- MaxProbesPerSecond: 1000,
+ CloudVMs: arvados.CloudVMsConfig{
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
+ PollInterval: arvados.Duration(5 * time.Millisecond),
+ ProbeInterval: arvados.Duration(5 * time.Millisecond),
+ MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
+ ResourceTags: map[string]string{"testtag": "test value"},
+ TagKeyPrefix: "test:",
+ },
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
test.InstanceType(8).Name: test.InstanceType(8),
test.InstanceType(16).Name: test.InstanceType(16),
},
- NodeProfiles: map[string]arvados.NodeProfile{
- "*": {
- Controller: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
- DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
- },
- },
}
- s.disp = &dispatcher{Cluster: s.cluster}
+ arvadostest.SetServiceURL(&s.cluster.Services.DispatchCloud, "http://localhost:/")
+ arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/")
+
+ arvClient, err := arvados.NewClientFromConfig(s.cluster)
+ c.Check(err, check.IsNil)
+
+ s.disp = &dispatcher{
+ Cluster: s.cluster,
+ Context: s.ctx,
+ ArvClient: arvClient,
+ AuthToken: arvadostest.AdminToken,
+ Registry: prometheus.NewRegistry(),
+ }
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
// go run().
}
func (s *DispatcherSuite) TearDownTest(c *check.C) {
+ s.cancel()
s.disp.Close()
}
// a fake queue and cloud driver. The fake cloud driver injects
// artificial errors in order to exercise a variety of code paths.
func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
queue := &test.Queue{
ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- executeContainer := func(ctr arvados.Container) int {
+ finishContainer := func(ctr arvados.Container) {
mtx.Lock()
defer mtx.Unlock()
if _, ok := waiting[ctr.UUID]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
- return 1
+ c.Errorf("container completed twice: %s", ctr.UUID)
+ return
}
delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
+ }
+ executeContainer := func(ctr arvados.Container) int {
+ finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
n := 0
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.ExecuteContainer = executeContainer
+ stubvm.CrashRunningContainer = finishContainer
switch n % 7 {
case 0:
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
case 1:
stubvm.CrunchRunMissing = true
+ case 2:
+ stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
default:
stubvm.CrunchRunCrashRate = 0.1
}
c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
}
- deadline := time.Now().Add(time.Second)
+ deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
c.Check(err, check.IsNil)
c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
}
}
+
+ req := httptest.NewRequest("GET", "/metrics", nil)
+ req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
+ resp := httptest.NewRecorder()
+ s.disp.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Create"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="List"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*instances_disappeared{state="shutdown"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*instances_disappeared{state="unknown"} 0\n.*`)
}
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
s.cluster.ManagementToken = "abcdefgh"
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
s.disp.queue = &test.Queue{}
go s.disp.run()
func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
s.cluster.ManagementToken = ""
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
s.disp.queue = &test.Queue{}
go s.disp.run()
func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
s.cluster.ManagementToken = "abcdefgh"
- s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
- drivers["test"] = s.stubDriver
+ s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
s.disp.queue = &test.Queue{}
go s.disp.run()
c.Check(ok, check.Equals, true)
<-ch
- sr = getInstances()
+ for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+ sr = getInstances()
+ if len(sr.Items) > 0 {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
c.Assert(len(sr.Items), check.Equals, 1)
c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
c.Check(sr.Items[0].WorkerState, check.Equals, "booting")