1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.curoverse.com/arvados.git/lib/cloud"
22 "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
23 "git.curoverse.com/arvados.git/sdk/go/arvados"
24 "github.com/Sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
26 check "gopkg.in/check.v1"
29 var _ = check.Suite(&DispatcherSuite{})
31 // fakeCloud provides an exec method that can be used as a
32 // test.StubExecFunc. It calls the provided makeVM func when called
33 // with a previously unseen instance ID. Calls to exec are passed on
34 // to the *fakeVM for the appropriate instance ID.
35 type fakeCloud struct {
37 makeVM func(cloud.Instance) *fakeVM
38 onComplete func(string)
40 vms map[cloud.InstanceID]*fakeVM
44 func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
46 fvm, ok := fc.vms[inst.ID()]
49 fc.vms = make(map[cloud.InstanceID]*fakeVM)
52 fc.vms[inst.ID()] = fvm
55 return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
58 // fakeVM is a fake VM with configurable delays and failure modes.
63 crunchRunCrashRate float64
64 crunchRunDetachDelay time.Duration
66 running map[string]bool
71 func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
72 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
73 if eta := fvm.boot.Sub(time.Now()); eta > 0 {
74 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
77 if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
78 fmt.Fprintf(stderr, "cannot fork\n")
81 if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
82 fmt.Fprint(stderr, "crunch-run: command not found\n")
85 if strings.HasPrefix(command, "crunch-run --detach ") {
87 if fvm.running == nil {
88 fvm.running = map[string]bool{}
90 fvm.running[uuid] = true
92 time.Sleep(fvm.crunchRunDetachDelay)
93 fmt.Fprintf(stderr, "starting %s\n", uuid)
94 logger := logrus.WithField("ContainerUUID", uuid)
95 logger.Printf("[test] starting crunch-run stub")
97 crashluck := rand.Float64()
98 ctr, ok := queue.Get(uuid)
100 logger.Print("[test] container not in queue")
103 if crashluck > fvm.crunchRunCrashRate/2 {
104 time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
105 ctr.State = arvados.ContainerStateRunning
109 time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
111 _, running := fvm.running[uuid]
114 logger.Print("[test] container was killed")
117 // TODO: Check whether the stub instance has
118 // been destroyed, and if so, don't call
119 // onComplete. Then "container finished twice"
120 // can be classified as a bug.
121 if crashluck < fvm.crunchRunCrashRate {
122 logger.Print("[test] crashing crunch-run stub")
123 if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
127 ctr.State = arvados.ContainerStateComplete
128 ctr.ExitCode = fvm.ctrExit
130 if onComplete != nil {
134 logger.Print("[test] exiting crunch-run stub")
137 delete(fvm.running, uuid)
141 if command == "crunch-run --list" {
144 for uuid := range fvm.running {
145 fmt.Fprintf(stdout, "%s\n", uuid)
149 if strings.HasPrefix(command, "crunch-run --kill ") {
152 if fvm.running[uuid] {
153 delete(fvm.running, uuid)
155 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
159 if command == "true" {
162 fmt.Fprintf(stderr, "%q: command not found", command)
166 type DispatcherSuite struct {
167 cluster *arvados.Cluster
168 instanceSet *test.LameInstanceSet
169 stubDriver *test.StubDriver
173 func (s *DispatcherSuite) SetUpSuite(c *check.C) {
174 if os.Getenv("ARVADOS_DEBUG") != "" {
175 logrus.StandardLogger().SetLevel(logrus.DebugLevel)
179 func (s *DispatcherSuite) SetUpTest(c *check.C) {
180 dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
181 dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
182 c.Assert(err, check.IsNil)
184 _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
185 s.stubDriver = &test.StubDriver{
186 Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
187 c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
191 AuthorizedKeys: []ssh.PublicKey{dispatchpub},
193 ErrorRateDestroy: 0.1,
196 s.cluster = &arvados.Cluster{
197 CloudVMs: arvados.CloudVMs{
199 SyncInterval: arvados.Duration(10 * time.Millisecond),
200 TimeoutIdle: arvados.Duration(30 * time.Millisecond),
201 TimeoutBooting: arvados.Duration(30 * time.Millisecond),
202 TimeoutProbe: arvados.Duration(15 * time.Millisecond),
203 TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
205 Dispatch: arvados.Dispatch{
206 PrivateKey: dispatchprivraw,
207 PollInterval: arvados.Duration(5 * time.Millisecond),
208 ProbeInterval: arvados.Duration(5 * time.Millisecond),
209 StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
210 MaxProbesPerSecond: 1000,
212 InstanceTypes: arvados.InstanceTypeMap{
213 test.InstanceType(1).Name: test.InstanceType(1),
214 test.InstanceType(2).Name: test.InstanceType(2),
215 test.InstanceType(3).Name: test.InstanceType(3),
216 test.InstanceType(4).Name: test.InstanceType(4),
217 test.InstanceType(6).Name: test.InstanceType(6),
218 test.InstanceType(8).Name: test.InstanceType(8),
219 test.InstanceType(16).Name: test.InstanceType(16),
221 NodeProfiles: map[string]arvados.NodeProfile{
223 Controller: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
224 DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
228 s.disp = &dispatcher{Cluster: s.cluster}
229 // Test cases can modify s.cluster before calling
230 // initialize(), and then modify private state before calling
234 func (s *DispatcherSuite) TearDownTest(c *check.C) {
238 // DispatchToStubDriver checks that the dispatcher wires everything
239 // together effectively. It uses a real scheduler and worker pool with
240 // a fake queue and cloud driver. The fake cloud driver injects
241 // artificial errors in order to exercise a variety of code paths.
242 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
243 drivers["test"] = s.stubDriver
244 s.disp.setupOnce.Do(s.disp.initialize)
245 queue := &test.Queue{
246 ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
247 return ChooseInstanceType(s.cluster, ctr)
250 for i := 0; i < 200; i++ {
251 queue.Containers = append(queue.Containers, arvados.Container{
252 UUID: test.ContainerUUID(i + 1),
253 State: arvados.ContainerStateQueued,
254 Priority: int64(i%20 + 1),
255 RuntimeConstraints: arvados.RuntimeConstraints{
256 RAM: int64(i%3+1) << 30,
264 done := make(chan struct{})
265 waiting := map[string]struct{}{}
266 for _, ctr := range queue.Containers {
267 waiting[ctr.UUID] = struct{}{}
269 onComplete := func(uuid string) {
272 if _, ok := waiting[uuid]; !ok {
273 c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
275 delete(waiting, uuid)
276 if len(waiting) == 0 {
283 makeVM: func(inst cloud.Instance) *fakeVM {
286 boot: time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
287 crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
288 ctrExit: int(rand.Uint32() & 0x3),
292 fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
294 fvm.crunchRunMissing = true
296 fvm.crunchRunCrashRate = 0.1
300 onComplete: onComplete,
301 onCancel: onComplete,
303 s.stubDriver.Exec = fc.exec
307 err := s.disp.CheckHealth()
308 c.Check(err, check.IsNil)
312 c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
313 case <-time.After(10 * time.Second):
314 c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
317 deadline := time.Now().Add(time.Second)
318 for range time.NewTicker(10 * time.Millisecond).C {
319 insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
320 c.Check(err, check.IsNil)
322 ents, _ := queue.Entries()
323 if len(ents) == 0 && len(insts) == 0 {
326 if time.Now().After(deadline) {
327 c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
332 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
333 drivers["test"] = s.stubDriver
334 s.cluster.ManagementToken = "abcdefgh"
335 for _, token := range []string{"abc", ""} {
336 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
338 req.Header.Set("Authorization", "Bearer "+token)
340 resp := httptest.NewRecorder()
341 s.disp.ServeHTTP(resp, req)
343 c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
345 c.Check(resp.Code, check.Equals, http.StatusForbidden)
350 func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
351 drivers["test"] = s.stubDriver
352 s.cluster.ManagementToken = ""
353 for _, token := range []string{"abc", ""} {
354 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
356 req.Header.Set("Authorization", "Bearer "+token)
358 resp := httptest.NewRecorder()
359 s.disp.ServeHTTP(resp, req)
360 c.Check(resp.Code, check.Equals, http.StatusForbidden)
364 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
365 s.cluster.ManagementToken = "abcdefgh"
366 s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
367 drivers["test"] = s.stubDriver
369 type instance struct {
373 LastContainerUUID string
374 ArvadosInstanceType string
375 ProviderInstanceType string
377 type instancesResponse struct {
380 getInstances := func() instancesResponse {
381 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
382 req.Header.Set("Authorization", "Bearer abcdefgh")
383 resp := httptest.NewRecorder()
384 s.disp.ServeHTTP(resp, req)
385 var sr instancesResponse
386 c.Check(resp.Code, check.Equals, http.StatusOK)
387 err := json.Unmarshal(resp.Body.Bytes(), &sr)
388 c.Check(err, check.IsNil)
393 c.Check(len(sr.Items), check.Equals, 0)
395 ch := s.disp.pool.Subscribe()
396 defer s.disp.pool.Unsubscribe(ch)
397 err := s.disp.pool.Create(test.InstanceType(1))
398 c.Check(err, check.IsNil)
402 c.Assert(len(sr.Items), check.Equals, 1)
403 c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
404 c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
405 c.Check(sr.Items[0].Price, check.Equals, 0.123)
406 c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
407 c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType)
408 c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name)