import (
"encoding/json"
- "fmt"
- "io"
"io/ioutil"
"math/rand"
+ "net/http"
"net/http/httptest"
"os"
- "regexp"
- "strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
+ "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
-// fakeCloud provides an exec method that can be used as a
-// test.StubExecFunc. It calls the provided makeVM func when called
-// with a previously unseen instance ID. Calls to exec are passed on
-// to the *fakeVM for the appropriate instance ID.
-type fakeCloud struct {
- queue *test.Queue
- makeVM func(cloud.Instance) *fakeVM
- onComplete func(string)
- onCancel func(string)
- vms map[cloud.InstanceID]*fakeVM
- sync.Mutex
-}
-
-func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
- fc.Lock()
- fvm, ok := fc.vms[inst.ID()]
- if !ok {
- if fc.vms == nil {
- fc.vms = make(map[cloud.InstanceID]*fakeVM)
- }
- fvm = fc.makeVM(inst)
- fc.vms[inst.ID()] = fvm
- }
- fc.Unlock()
- return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
-}
-
-// fakeVM is a fake VM with configurable delays and failure modes.
-type fakeVM struct {
- boot time.Time
- broken time.Time
- crunchRunMissing bool
- crunchRunCrashRate float64
- crunchRunDetachDelay time.Duration
- ctrExit int
- running map[string]bool
- completed []string
- sync.Mutex
-}
-
-func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
- uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
- if eta := fvm.boot.Sub(time.Now()); eta > 0 {
- fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
- return 1
- }
- if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
- fmt.Fprintf(stderr, "cannot fork\n")
- return 2
- }
- if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
- fmt.Fprint(stderr, "crunch-run: command not found\n")
- return 1
- }
- if strings.HasPrefix(command, "crunch-run --detach ") {
- fvm.Lock()
- if fvm.running == nil {
- fvm.running = map[string]bool{}
- }
- fvm.running[uuid] = true
- fvm.Unlock()
- time.Sleep(fvm.crunchRunDetachDelay)
- fmt.Fprintf(stderr, "starting %s\n", uuid)
- logger := logrus.WithField("ContainerUUID", uuid)
- logger.Printf("[test] starting crunch-run stub")
- go func() {
- crashluck := rand.Float64()
- ctr, ok := queue.Get(uuid)
- if !ok {
- logger.Print("[test] container not in queue")
- return
- }
- if crashluck > fvm.crunchRunCrashRate/2 {
- time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
- ctr.State = arvados.ContainerStateRunning
- queue.Notify(ctr)
- }
-
- time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
- fvm.Lock()
- _, running := fvm.running[uuid]
- fvm.Unlock()
- if !running {
- logger.Print("[test] container was killed")
- return
- }
- // TODO: Check whether the stub instance has
- // been destroyed, and if so, don't call
- // onComplete. Then "container finished twice"
- // can be classified as a bug.
- if crashluck < fvm.crunchRunCrashRate {
- logger.Print("[test] crashing crunch-run stub")
- if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
- onCancel(uuid)
- }
- } else {
- ctr.State = arvados.ContainerStateComplete
- ctr.ExitCode = fvm.ctrExit
- queue.Notify(ctr)
- if onComplete != nil {
- onComplete(uuid)
- }
- }
- logger.Print("[test] exiting crunch-run stub")
- fvm.Lock()
- defer fvm.Unlock()
- delete(fvm.running, uuid)
- }()
- return 0
- }
- if command == "crunch-run --list" {
- fvm.Lock()
- defer fvm.Unlock()
- for uuid := range fvm.running {
- fmt.Fprintf(stdout, "%s\n", uuid)
- }
- return 0
- }
- if strings.HasPrefix(command, "crunch-run --kill ") {
- fvm.Lock()
- defer fvm.Unlock()
- if fvm.running[uuid] {
- delete(fvm.running, uuid)
- } else {
- fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
- }
- return 0
- }
- if command == "true" {
- return 0
- }
- fmt.Fprintf(stderr, "%q: command not found", command)
- return 1
-}
-
type DispatcherSuite struct {
cluster *arvados.Cluster
instanceSet *test.LameInstanceSet
_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
s.stubDriver = &test.StubDriver{
- Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
- c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
- return 1
- },
- HostKey: hostpriv,
- AuthorizedKeys: []ssh.PublicKey{dispatchpub},
-
- ErrorRateDestroy: 0.1,
+ HostKey: hostpriv,
+ AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateDestroy: 0.1,
+ MinTimeBetweenCreateCalls: time.Millisecond,
}
s.cluster = &arvados.Cluster{
CloudVMs: arvados.CloudVMs{
Driver: "test",
SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(30 * time.Millisecond),
- TimeoutBooting: arvados.Duration(30 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
TimeoutProbe: arvados.Duration(15 * time.Millisecond),
TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
},
s.disp.Close()
}
+// DispatchToStubDriver checks that the dispatcher wires everything
+// together effectively. It uses a real scheduler and worker pool with
+// a fake queue and cloud driver. The fake cloud driver injects
+// artificial errors in order to exercise a variety of code paths.
func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- onComplete := func(uuid string) {
+ executeContainer := func(ctr arvados.Container) int {
mtx.Lock()
defer mtx.Unlock()
- if _, ok := waiting[uuid]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
+ if _, ok := waiting[ctr.UUID]; !ok {
+ c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
+ return 1
}
- delete(waiting, uuid)
+ delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
+ return int(rand.Uint32() & 0x3)
}
n := 0
- fc := &fakeCloud{
- queue: queue,
- makeVM: func(inst cloud.Instance) *fakeVM {
- n++
- fvm := &fakeVM{
- boot: time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
- crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
- ctrExit: int(rand.Uint32() & 0x3),
- }
- switch n % 7 {
- case 0:
- fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
- case 1:
- fvm.crunchRunMissing = true
- default:
- fvm.crunchRunCrashRate = 0.1
- }
- return fvm
- },
- onComplete: onComplete,
- onCancel: onComplete,
+ s.stubDriver.Queue = queue
+ s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+ n++
+ stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
+ stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
+ stubvm.ExecuteContainer = executeContainer
+ switch n % 7 {
+ case 0:
+ stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+ case 1:
+ stubvm.CrunchRunMissing = true
+ default:
+ stubvm.CrunchRunCrashRate = 0.1
+ }
}
- s.stubDriver.Exec = fc.exec
start := time.Now()
go s.disp.run()
}
}
+func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
+ s.cluster.ManagementToken = "abcdefgh"
+ drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
+ for _, token := range []string{"abc", ""} {
+ req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+ if token != "" {
+ req.Header.Set("Authorization", "Bearer "+token)
+ }
+ resp := httptest.NewRecorder()
+ s.disp.ServeHTTP(resp, req)
+ if token == "" {
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+ } else {
+ c.Check(resp.Code, check.Equals, http.StatusForbidden)
+ }
+ }
+}
+
+func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
+ s.cluster.ManagementToken = ""
+ drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
+ for _, token := range []string{"abc", ""} {
+ req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+ if token != "" {
+ req.Header.Set("Authorization", "Bearer "+token)
+ }
+ resp := httptest.NewRecorder()
+ s.disp.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusForbidden)
+ }
+}
+
func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
+ s.cluster.ManagementToken = "abcdefgh"
s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
type instance struct {
Instance string
- WorkerState string
+ WorkerState string `json:"worker_state"`
Price float64
- LastContainerUUID string
- ArvadosInstanceType string
- ProviderInstanceType string
+ LastContainerUUID string `json:"last_container_uuid"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
}
type instancesResponse struct {
Items []instance
}
getInstances := func() instancesResponse {
req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+ req.Header.Set("Authorization", "Bearer abcdefgh")
resp := httptest.NewRecorder()
s.disp.ServeHTTP(resp, req)
var sr instancesResponse
+ c.Check(resp.Code, check.Equals, http.StatusOK)
err := json.Unmarshal(resp.Body.Bytes(), &sr)
c.Check(err, check.IsNil)
return sr
ch := s.disp.pool.Subscribe()
defer s.disp.pool.Unsubscribe(ch)
- err := s.disp.pool.Create(test.InstanceType(1))
- c.Check(err, check.IsNil)
+ ok := s.disp.pool.Create(test.InstanceType(1))
+ c.Check(ok, check.Equals, true)
<-ch
sr = getInstances()