13647: Use cluster config instead of custom keepstore config.
[arvados.git] / lib / dispatchcloud / dispatcher_test.go
index 3ac9370a30d65242eed947d1c55346efb77d94e2..1972468f47d697a6b210aa40653c80a4f5baab77 100644 (file)
 package dispatchcloud
 
 import (
+       "context"
        "encoding/json"
-       "fmt"
-       "io"
        "io/ioutil"
        "math/rand"
+       "net/http"
        "net/http/httptest"
        "os"
-       "regexp"
-       "strings"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
        "golang.org/x/crypto/ssh"
        check "gopkg.in/check.v1"
 )
 
 var _ = check.Suite(&DispatcherSuite{})
 
-// fakeCloud provides an exec method that can be used as a
-// test.StubExecFunc. It calls the provided makeVM func when called
-// with a previously unseen instance ID. Calls to exec are passed on
-// to the *fakeVM for the appropriate instance ID.
-type fakeCloud struct {
-       queue      *test.Queue
-       makeVM     func(cloud.Instance) *fakeVM
-       onComplete func(string)
-       onCancel   func(string)
-       vms        map[cloud.InstanceID]*fakeVM
-       sync.Mutex
-}
-
-func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
-       fc.Lock()
-       fvm, ok := fc.vms[inst.ID()]
-       if !ok {
-               if fc.vms == nil {
-                       fc.vms = make(map[cloud.InstanceID]*fakeVM)
-               }
-               fvm = fc.makeVM(inst)
-               fc.vms[inst.ID()] = fvm
-       }
-       fc.Unlock()
-       return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
-}
-
-// fakeVM is a fake VM with configurable delays and failure modes.
-type fakeVM struct {
-       boot                 time.Time
-       broken               time.Time
-       crunchRunMissing     bool
-       crunchRunCrashRate   float64
-       crunchRunDetachDelay time.Duration
-       ctrExit              int
-       running              map[string]bool
-       completed            []string
-       sync.Mutex
-}
-
-func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
-       uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
-       if eta := fvm.boot.Sub(time.Now()); eta > 0 {
-               fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
-               return 1
-       }
-       if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
-               fmt.Fprintf(stderr, "cannot fork\n")
-               return 2
-       }
-       if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
-               fmt.Fprint(stderr, "crunch-run: command not found\n")
-               return 1
-       }
-       if strings.HasPrefix(command, "crunch-run --detach ") {
-               fvm.Lock()
-               if fvm.running == nil {
-                       fvm.running = map[string]bool{}
-               }
-               fvm.running[uuid] = true
-               fvm.Unlock()
-               time.Sleep(fvm.crunchRunDetachDelay)
-               fmt.Fprintf(stderr, "starting %s\n", uuid)
-               logger := logrus.WithField("ContainerUUID", uuid)
-               logger.Printf("[test] starting crunch-run stub")
-               go func() {
-                       crashluck := rand.Float64()
-                       ctr, ok := queue.Get(uuid)
-                       if !ok {
-                               logger.Print("[test] container not in queue")
-                               return
-                       }
-                       if crashluck > fvm.crunchRunCrashRate/2 {
-                               time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
-                               ctr.State = arvados.ContainerStateRunning
-                               queue.Notify(ctr)
-                       }
-
-                       time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
-                       fvm.Lock()
-                       _, running := fvm.running[uuid]
-                       fvm.Unlock()
-                       if !running {
-                               logger.Print("[test] container was killed")
-                               return
-                       }
-                       // TODO: Check whether the stub instance has
-                       // been destroyed, and if so, don't call
-                       // onComplete. Then "container finished twice"
-                       // can be classified as a bug.
-                       if crashluck < fvm.crunchRunCrashRate {
-                               logger.Print("[test] crashing crunch-run stub")
-                               if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
-                                       onCancel(uuid)
-                               }
-                       } else {
-                               ctr.State = arvados.ContainerStateComplete
-                               ctr.ExitCode = fvm.ctrExit
-                               queue.Notify(ctr)
-                               if onComplete != nil {
-                                       onComplete(uuid)
-                               }
-                       }
-                       logger.Print("[test] exiting crunch-run stub")
-                       fvm.Lock()
-                       defer fvm.Unlock()
-                       delete(fvm.running, uuid)
-               }()
-               return 0
-       }
-       if command == "crunch-run --list" {
-               fvm.Lock()
-               defer fvm.Unlock()
-               for uuid := range fvm.running {
-                       fmt.Fprintf(stdout, "%s\n", uuid)
-               }
-               return 0
-       }
-       if strings.HasPrefix(command, "crunch-run --kill ") {
-               fvm.Lock()
-               defer fvm.Unlock()
-               if fvm.running[uuid] {
-                       delete(fvm.running, uuid)
-               } else {
-                       fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
-               }
-               return 0
-       }
-       if command == "true" {
-               return 0
-       }
-       fmt.Fprintf(stderr, "%q: command not found", command)
-       return 1
-}
-
 type DispatcherSuite struct {
-       cluster     *arvados.Cluster
-       instanceSet *test.LameInstanceSet
-       stubDriver  *test.StubDriver
-       disp        *dispatcher
-}
-
-func (s *DispatcherSuite) SetUpSuite(c *check.C) {
-       if os.Getenv("ARVADOS_DEBUG") != "" {
-               logrus.StandardLogger().SetLevel(logrus.DebugLevel)
-       }
+       ctx        context.Context
+       cancel     context.CancelFunc
+       cluster    *arvados.Cluster
+       stubDriver *test.StubDriver
+       disp       *dispatcher
 }
 
 func (s *DispatcherSuite) SetUpTest(c *check.C) {
+       s.ctx, s.cancel = context.WithCancel(context.Background())
+       s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
        dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
        dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
        c.Assert(err, check.IsNil)
 
        _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
        s.stubDriver = &test.StubDriver{
-               Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
-                       c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
-                       return 1
-               },
-               HostKey:        hostpriv,
-               AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+               HostKey:                   hostpriv,
+               AuthorizedKeys:            []ssh.PublicKey{dispatchpub},
+               ErrorRateDestroy:          0.1,
+               MinTimeBetweenCreateCalls: time.Millisecond,
        }
 
        s.cluster = &arvados.Cluster{
-               CloudVMs: arvados.CloudVMs{
-                       Driver:          "test",
-                       SyncInterval:    arvados.Duration(10 * time.Millisecond),
-                       TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
-                       TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
-                       TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
-                       TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
-               },
-               Dispatch: arvados.Dispatch{
-                       PrivateKey:         dispatchprivraw,
-                       PollInterval:       arvados.Duration(5 * time.Millisecond),
-                       ProbeInterval:      arvados.Duration(5 * time.Millisecond),
+               ManagementToken: "test-management-token",
+               Containers: arvados.ContainersConfig{
+                       DispatchPrivateKey: string(dispatchprivraw),
                        StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
-                       MaxProbesPerSecond: 1000,
+                       CloudVMs: arvados.CloudVMsConfig{
+                               Driver:               "test",
+                               SyncInterval:         arvados.Duration(10 * time.Millisecond),
+                               TimeoutIdle:          arvados.Duration(150 * time.Millisecond),
+                               TimeoutBooting:       arvados.Duration(150 * time.Millisecond),
+                               TimeoutProbe:         arvados.Duration(15 * time.Millisecond),
+                               TimeoutShutdown:      arvados.Duration(5 * time.Millisecond),
+                               MaxCloudOpsPerSecond: 500,
+                               PollInterval:         arvados.Duration(5 * time.Millisecond),
+                               ProbeInterval:        arvados.Duration(5 * time.Millisecond),
+                               MaxProbesPerSecond:   1000,
+                               TimeoutSignal:        arvados.Duration(3 * time.Millisecond),
+                               TimeoutTERM:          arvados.Duration(20 * time.Millisecond),
+                               ResourceTags:         map[string]string{"testtag": "test value"},
+                               TagKeyPrefix:         "test:",
+                       },
                },
                InstanceTypes: arvados.InstanceTypeMap{
                        test.InstanceType(1).Name:  test.InstanceType(1),
@@ -215,25 +80,36 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        test.InstanceType(8).Name:  test.InstanceType(8),
                        test.InstanceType(16).Name: test.InstanceType(16),
                },
-               NodeProfiles: map[string]arvados.NodeProfile{
-                       "*": {
-                               Controller:    arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
-                               DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
-                       },
-               },
        }
-       s.disp = &dispatcher{Cluster: s.cluster}
+       arvadostest.SetServiceURL(&s.cluster.Services.DispatchCloud, "http://localhost:/")
+       arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/")
+
+       arvClient, err := arvados.NewClientFromConfig(s.cluster)
+       c.Check(err, check.IsNil)
+
+       s.disp = &dispatcher{
+               Cluster:   s.cluster,
+               Context:   s.ctx,
+               ArvClient: arvClient,
+               AuthToken: arvadostest.AdminToken,
+               Registry:  prometheus.NewRegistry(),
+       }
        // Test cases can modify s.cluster before calling
        // initialize(), and then modify private state before calling
        // go run().
 }
 
 func (s *DispatcherSuite) TearDownTest(c *check.C) {
+       s.cancel()
        s.disp.Close()
 }
 
+// DispatchToStubDriver checks that the dispatcher wires everything
+// together effectively. It uses a real scheduler and worker pool with
+// a fake queue and cloud driver. The fake cloud driver injects
+// artificial errors in order to exercise a variety of code paths.
 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
-       drivers["test"] = s.stubDriver
+       Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
                ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
@@ -259,41 +135,41 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        for _, ctr := range queue.Containers {
                waiting[ctr.UUID] = struct{}{}
        }
-       onComplete := func(uuid string) {
+       finishContainer := func(ctr arvados.Container) {
                mtx.Lock()
                defer mtx.Unlock()
-               if _, ok := waiting[uuid]; !ok {
-                       c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
+               if _, ok := waiting[ctr.UUID]; !ok {
+                       c.Errorf("container completed twice: %s", ctr.UUID)
+                       return
                }
-               delete(waiting, uuid)
+               delete(waiting, ctr.UUID)
                if len(waiting) == 0 {
                        close(done)
                }
        }
+       executeContainer := func(ctr arvados.Container) int {
+               finishContainer(ctr)
+               return int(rand.Uint32() & 0x3)
+       }
        n := 0
-       fc := &fakeCloud{
-               queue: queue,
-               makeVM: func(inst cloud.Instance) *fakeVM {
-                       n++
-                       fvm := &fakeVM{
-                               boot:                 time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
-                               crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
-                               ctrExit:              int(rand.Uint32() & 0x3),
-                       }
-                       switch n % 7 {
-                       case 0:
-                               fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
-                       case 1:
-                               fvm.crunchRunMissing = true
-                       default:
-                               fvm.crunchRunCrashRate = 0.1
-                       }
-                       return fvm
-               },
-               onComplete: onComplete,
-               onCancel:   onComplete,
+       s.stubDriver.Queue = queue
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+               n++
+               stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
+               stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
+               stubvm.ExecuteContainer = executeContainer
+               stubvm.CrashRunningContainer = finishContainer
+               switch n % 7 {
+               case 0:
+                       stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+               case 1:
+                       stubvm.CrunchRunMissing = true
+               case 2:
+                       stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
+               default:
+                       stubvm.CrunchRunCrashRate = 0.1
+               }
        }
-       s.stubDriver.Exec = fc.exec
 
        start := time.Now()
        go s.disp.run()
@@ -307,7 +183,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
        }
 
-       deadline := time.Now().Add(time.Second)
+       deadline := time.Now().Add(5 * time.Second)
        for range time.NewTicker(10 * time.Millisecond).C {
                insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
                c.Check(err, check.IsNil)
@@ -320,28 +196,87 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
                }
        }
+
+       req := httptest.NewRequest("GET", "/metrics", nil)
+       req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
+       resp := httptest.NewRecorder()
+       s.disp.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Create"} [^0].*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="List"} [^0].*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*instances_disappeared{state="shutdown"} [^0].*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*instances_disappeared{state="unknown"} 0\n.*`)
+}
+
+func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
+       s.cluster.ManagementToken = "abcdefgh"
+       Drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
+
+       for _, token := range []string{"abc", ""} {
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               if token != "" {
+                       req.Header.Set("Authorization", "Bearer "+token)
+               }
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               if token == "" {
+                       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+               } else {
+                       c.Check(resp.Code, check.Equals, http.StatusForbidden)
+               }
+       }
+}
+
+func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
+       s.cluster.ManagementToken = ""
+       Drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
+
+       for _, token := range []string{"abc", ""} {
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               if token != "" {
+                       req.Header.Set("Authorization", "Bearer "+token)
+               }
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusForbidden)
+       }
 }
 
 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
-       s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
-       drivers["test"] = s.stubDriver
+       s.cluster.ManagementToken = "abcdefgh"
+       s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
+       Drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
 
        type instance struct {
                Instance             string
-               WorkerState          string
+               WorkerState          string `json:"worker_state"`
                Price                float64
-               LastContainerUUID    string
-               ArvadosInstanceType  string
-               ProviderInstanceType string
+               LastContainerUUID    string `json:"last_container_uuid"`
+               ArvadosInstanceType  string `json:"arvados_instance_type"`
+               ProviderInstanceType string `json:"provider_instance_type"`
        }
        type instancesResponse struct {
                Items []instance
        }
        getInstances := func() instancesResponse {
                req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               req.Header.Set("Authorization", "Bearer abcdefgh")
                resp := httptest.NewRecorder()
                s.disp.ServeHTTP(resp, req)
                var sr instancesResponse
+               c.Check(resp.Code, check.Equals, http.StatusOK)
                err := json.Unmarshal(resp.Body.Bytes(), &sr)
                c.Check(err, check.IsNil)
                return sr
@@ -352,11 +287,17 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
 
        ch := s.disp.pool.Subscribe()
        defer s.disp.pool.Unsubscribe(ch)
-       err := s.disp.pool.Create(test.InstanceType(1))
-       c.Check(err, check.IsNil)
+       ok := s.disp.pool.Create(test.InstanceType(1))
+       c.Check(ok, check.Equals, true)
        <-ch
 
-       sr = getInstances()
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+               sr = getInstances()
+               if len(sr.Items) > 0 {
+                       break
+               }
+               time.Sleep(time.Millisecond)
+       }
        c.Assert(len(sr.Items), check.Equals, 1)
        c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
        c.Check(sr.Items[0].WorkerState, check.Equals, "booting")