package dispatchcloud
import (
+ "context"
"encoding/json"
- "fmt"
- "io"
"io/ioutil"
"math/rand"
+ "net/http"
"net/http/httptest"
"os"
- "regexp"
- "strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
-// fakeCloud provides an exec method that can be used as a
-// test.StubExecFunc. It calls the provided makeVM func when called
-// with a previously unseen instance ID. Calls to exec are passed on
-// to the *fakeVM for the appropriate instance ID.
-type fakeCloud struct {
- queue *test.Queue
- makeVM func(cloud.Instance) *fakeVM
- onComplete func(string)
- onCancel func(string)
- vms map[cloud.InstanceID]*fakeVM
- sync.Mutex
-}
-
-func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
- fc.Lock()
- fvm, ok := fc.vms[inst.ID()]
- if !ok {
- if fc.vms == nil {
- fc.vms = make(map[cloud.InstanceID]*fakeVM)
- }
- fvm = fc.makeVM(inst)
- fc.vms[inst.ID()] = fvm
- }
- fc.Unlock()
- return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
-}
-
-// fakeVM is a fake VM with configurable delays and failure modes.
-type fakeVM struct {
- boot time.Time
- broken time.Time
- crunchRunMissing bool
- crunchRunCrashRate float64
- crunchRunDetachDelay time.Duration
- ctrExit int
- running map[string]bool
- completed []string
- sync.Mutex
-}
-
-func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
- uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
- if eta := fvm.boot.Sub(time.Now()); eta > 0 {
- fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
- return 1
- }
- if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
- fmt.Fprintf(stderr, "cannot fork\n")
- return 2
- }
- if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
- fmt.Fprint(stderr, "crunch-run: command not found\n")
- return 1
- }
- if strings.HasPrefix(command, "crunch-run --detach ") {
- fvm.Lock()
- if fvm.running == nil {
- fvm.running = map[string]bool{}
- }
- fvm.running[uuid] = true
- fvm.Unlock()
- time.Sleep(fvm.crunchRunDetachDelay)
- fmt.Fprintf(stderr, "starting %s\n", uuid)
- logger := logrus.WithField("ContainerUUID", uuid)
- logger.Printf("[test] starting crunch-run stub")
- go func() {
- crashluck := rand.Float64()
- ctr, ok := queue.Get(uuid)
- if !ok {
- logger.Print("[test] container not in queue")
- return
- }
- if crashluck > fvm.crunchRunCrashRate/2 {
- time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
- ctr.State = arvados.ContainerStateRunning
- queue.Notify(ctr)
- }
-
- time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
- fvm.Lock()
- _, running := fvm.running[uuid]
- fvm.Unlock()
- if !running {
- logger.Print("[test] container was killed")
- return
- }
- // TODO: Check whether the stub instance has
- // been destroyed, and if so, don't call
- // onComplete. Then "container finished twice"
- // can be classified as a bug.
- if crashluck < fvm.crunchRunCrashRate {
- logger.Print("[test] crashing crunch-run stub")
- if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
- onCancel(uuid)
- }
- } else {
- ctr.State = arvados.ContainerStateComplete
- ctr.ExitCode = fvm.ctrExit
- queue.Notify(ctr)
- if onComplete != nil {
- onComplete(uuid)
- }
- }
- logger.Print("[test] exiting crunch-run stub")
- fvm.Lock()
- defer fvm.Unlock()
- delete(fvm.running, uuid)
- }()
- return 0
- }
- if command == "crunch-run --list" {
- fvm.Lock()
- defer fvm.Unlock()
- for uuid := range fvm.running {
- fmt.Fprintf(stdout, "%s\n", uuid)
- }
- return 0
- }
- if strings.HasPrefix(command, "crunch-run --kill ") {
- fvm.Lock()
- defer fvm.Unlock()
- if fvm.running[uuid] {
- delete(fvm.running, uuid)
- } else {
- fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
- }
- return 0
- }
- if command == "true" {
- return 0
- }
- fmt.Fprintf(stderr, "%q: command not found", command)
- return 1
-}
-
type DispatcherSuite struct {
- cluster *arvados.Cluster
- instanceSet *test.LameInstanceSet
- stubDriver *test.StubDriver
- disp *dispatcher
-}
-
-func (s *DispatcherSuite) SetUpSuite(c *check.C) {
- if os.Getenv("ARVADOS_DEBUG") != "" {
- logrus.StandardLogger().SetLevel(logrus.DebugLevel)
- }
+ ctx context.Context
+ cancel context.CancelFunc
+ cluster *arvados.Cluster
+ stubDriver *test.StubDriver
+ disp *dispatcher
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
c.Assert(err, check.IsNil)
_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
s.stubDriver = &test.StubDriver{
- Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
- c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
- return 1
- },
- HostKey: hostpriv,
- AuthorizedKeys: []ssh.PublicKey{dispatchpub},
-
- ErrorRateDestroy: 0.1,
+ HostKey: hostpriv,
+ AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateDestroy: 0.1,
+ MinTimeBetweenCreateCalls: time.Millisecond,
}
s.cluster = &arvados.Cluster{
- CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(30 * time.Millisecond),
- TimeoutBooting: arvados.Duration(30 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
- },
- Dispatch: arvados.Dispatch{
- PrivateKey: dispatchprivraw,
- PollInterval: arvados.Duration(5 * time.Millisecond),
- ProbeInterval: arvados.Duration(5 * time.Millisecond),
+ ManagementToken: "test-management-token",
+ Containers: arvados.ContainersConfig{
+ DispatchPrivateKey: string(dispatchprivraw),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
- MaxProbesPerSecond: 1000,
+ CloudVMs: arvados.CloudVMsConfig{
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
+ PollInterval: arvados.Duration(5 * time.Millisecond),
+ ProbeInterval: arvados.Duration(5 * time.Millisecond),
+ MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
+ ResourceTags: map[string]string{"testtag": "test value"},
+ TagKeyPrefix: "test:",
+ },
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
test.InstanceType(8).Name: test.InstanceType(8),
test.InstanceType(16).Name: test.InstanceType(16),
},
- NodeProfiles: map[string]arvados.NodeProfile{
- "*": {
- Controller: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
- DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
- },
- },
}
- s.disp = &dispatcher{Cluster: s.cluster}
+ arvadostest.SetServiceURL(&s.cluster.Services.DispatchCloud, "http://localhost:/")
+ arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/")
+
+ arvClient, err := arvados.NewClientFromConfig(s.cluster)
+ c.Check(err, check.IsNil)
+
+ s.disp = &dispatcher{
+ Cluster: s.cluster,
+ Context: s.ctx,
+ ArvClient: arvClient,
+ AuthToken: arvadostest.AdminToken,
+ Registry: prometheus.NewRegistry(),
+ }
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
// go run().
}
func (s *DispatcherSuite) TearDownTest(c *check.C) {
+ s.cancel()
s.disp.Close()
}
+// DispatchToStubDriver checks that the dispatcher wires everything
+// together effectively. It uses a real scheduler and worker pool with
+// a fake queue and cloud driver. The fake cloud driver injects
+// artificial errors in order to exercise a variety of code paths.
func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
queue := &test.Queue{
ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- onComplete := func(uuid string) {
+ finishContainer := func(ctr arvados.Container) {
mtx.Lock()
defer mtx.Unlock()
- if _, ok := waiting[uuid]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
+ if _, ok := waiting[ctr.UUID]; !ok {
+ c.Errorf("container completed twice: %s", ctr.UUID)
+ return
}
- delete(waiting, uuid)
+ delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
}
+ executeContainer := func(ctr arvados.Container) int {
+ finishContainer(ctr)
+ return int(rand.Uint32() & 0x3)
+ }
n := 0
- fc := &fakeCloud{
- queue: queue,
- makeVM: func(inst cloud.Instance) *fakeVM {
- n++
- fvm := &fakeVM{
- boot: time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
- crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
- ctrExit: int(rand.Uint32() & 0x3),
- }
- switch n % 7 {
- case 0:
- fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
- case 1:
- fvm.crunchRunMissing = true
- default:
- fvm.crunchRunCrashRate = 0.1
- }
- return fvm
- },
- onComplete: onComplete,
- onCancel: onComplete,
+ s.stubDriver.Queue = queue
+ s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+ n++
+ stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
+ stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
+ stubvm.ExecuteContainer = executeContainer
+ stubvm.CrashRunningContainer = finishContainer
+ switch n % 7 {
+ case 0:
+ stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+ case 1:
+ stubvm.CrunchRunMissing = true
+ case 2:
+ stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
+ default:
+ stubvm.CrunchRunCrashRate = 0.1
+ }
}
- s.stubDriver.Exec = fc.exec
start := time.Now()
go s.disp.run()
c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
}
- deadline := time.Now().Add(time.Second)
+ deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
c.Check(err, check.IsNil)
c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
}
}
+
+ req := httptest.NewRequest("GET", "/metrics", nil)
+ req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
+ resp := httptest.NewRecorder()
+ s.disp.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Create"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="List"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*instances_disappeared{state="shutdown"} [^0].*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*instances_disappeared{state="unknown"} 0\n.*`)
+}
+
+func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
+ s.cluster.ManagementToken = "abcdefgh"
+ Drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
+ for _, token := range []string{"abc", ""} {
+ req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+ if token != "" {
+ req.Header.Set("Authorization", "Bearer "+token)
+ }
+ resp := httptest.NewRecorder()
+ s.disp.ServeHTTP(resp, req)
+ if token == "" {
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+ } else {
+ c.Check(resp.Code, check.Equals, http.StatusForbidden)
+ }
+ }
+}
+
+func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
+ s.cluster.ManagementToken = ""
+ Drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
+ for _, token := range []string{"abc", ""} {
+ req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+ if token != "" {
+ req.Header.Set("Authorization", "Bearer "+token)
+ }
+ resp := httptest.NewRecorder()
+ s.disp.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusForbidden)
+ }
}
func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
- s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
- drivers["test"] = s.stubDriver
+ s.cluster.ManagementToken = "abcdefgh"
+ s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
+ Drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
type instance struct {
Instance string
- WorkerState string
+ WorkerState string `json:"worker_state"`
Price float64
- LastContainerUUID string
- ArvadosInstanceType string
- ProviderInstanceType string
+ LastContainerUUID string `json:"last_container_uuid"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
}
type instancesResponse struct {
Items []instance
}
getInstances := func() instancesResponse {
req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+ req.Header.Set("Authorization", "Bearer abcdefgh")
resp := httptest.NewRecorder()
s.disp.ServeHTTP(resp, req)
var sr instancesResponse
+ c.Check(resp.Code, check.Equals, http.StatusOK)
err := json.Unmarshal(resp.Body.Bytes(), &sr)
c.Check(err, check.IsNil)
return sr
ch := s.disp.pool.Subscribe()
defer s.disp.pool.Unsubscribe(ch)
- err := s.disp.pool.Create(test.InstanceType(1))
- c.Check(err, check.IsNil)
+ ok := s.disp.pool.Create(test.InstanceType(1))
+ c.Check(ok, check.Equals, true)
<-ch
- sr = getInstances()
+ for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+ sr = getInstances()
+ if len(sr.Items) > 0 {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
c.Assert(len(sr.Items), check.Equals, 1)
c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
c.Check(sr.Items[0].WorkerState, check.Equals, "booting")