14360: Fix log spam on normal race condition.
[arvados.git] / lib / dispatchcloud / dispatcher_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "math/rand"
13         "net/http"
14         "net/http/httptest"
15         "os"
16         "regexp"
17         "strings"
18         "sync"
19         "time"
20
21         "git.curoverse.com/arvados.git/lib/cloud"
22         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/Sirupsen/logrus"
25         "golang.org/x/crypto/ssh"
26         check "gopkg.in/check.v1"
27 )
28
29 var _ = check.Suite(&DispatcherSuite{})
30
31 // fakeCloud provides an exec method that can be used as a
32 // test.StubExecFunc. It calls the provided makeVM func when called
33 // with a previously unseen instance ID. Calls to exec are passed on
34 // to the *fakeVM for the appropriate instance ID.
35 type fakeCloud struct {
36         queue      *test.Queue
37         makeVM     func(cloud.Instance) *fakeVM
38         onComplete func(string)
39         onCancel   func(string)
40         vms        map[cloud.InstanceID]*fakeVM
41         sync.Mutex
42 }
43
44 func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
45         fc.Lock()
46         fvm, ok := fc.vms[inst.ID()]
47         if !ok {
48                 if fc.vms == nil {
49                         fc.vms = make(map[cloud.InstanceID]*fakeVM)
50                 }
51                 fvm = fc.makeVM(inst)
52                 fc.vms[inst.ID()] = fvm
53         }
54         fc.Unlock()
55         return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
56 }
57
58 // fakeVM is a fake VM with configurable delays and failure modes.
59 type fakeVM struct {
60         boot                 time.Time
61         broken               time.Time
62         crunchRunMissing     bool
63         crunchRunCrashRate   float64
64         crunchRunDetachDelay time.Duration
65         ctrExit              int
66         running              map[string]bool
67         completed            []string
68         sync.Mutex
69 }
70
71 func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
72         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
73         if eta := fvm.boot.Sub(time.Now()); eta > 0 {
74                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
75                 return 1
76         }
77         if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
78                 fmt.Fprintf(stderr, "cannot fork\n")
79                 return 2
80         }
81         if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
82                 fmt.Fprint(stderr, "crunch-run: command not found\n")
83                 return 1
84         }
85         if strings.HasPrefix(command, "crunch-run --detach ") {
86                 fvm.Lock()
87                 if fvm.running == nil {
88                         fvm.running = map[string]bool{}
89                 }
90                 fvm.running[uuid] = true
91                 fvm.Unlock()
92                 time.Sleep(fvm.crunchRunDetachDelay)
93                 fmt.Fprintf(stderr, "starting %s\n", uuid)
94                 logger := logrus.WithField("ContainerUUID", uuid)
95                 logger.Printf("[test] starting crunch-run stub")
96                 go func() {
97                         crashluck := rand.Float64()
98                         ctr, ok := queue.Get(uuid)
99                         if !ok {
100                                 logger.Print("[test] container not in queue")
101                                 return
102                         }
103                         if crashluck > fvm.crunchRunCrashRate/2 {
104                                 time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
105                                 ctr.State = arvados.ContainerStateRunning
106                                 queue.Notify(ctr)
107                         }
108
109                         time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
110                         fvm.Lock()
111                         _, running := fvm.running[uuid]
112                         fvm.Unlock()
113                         if !running {
114                                 logger.Print("[test] container was killed")
115                                 return
116                         }
117                         // TODO: Check whether the stub instance has
118                         // been destroyed, and if so, don't call
119                         // onComplete. Then "container finished twice"
120                         // can be classified as a bug.
121                         if crashluck < fvm.crunchRunCrashRate {
122                                 logger.Print("[test] crashing crunch-run stub")
123                                 if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
124                                         onCancel(uuid)
125                                 }
126                         } else {
127                                 ctr.State = arvados.ContainerStateComplete
128                                 ctr.ExitCode = fvm.ctrExit
129                                 queue.Notify(ctr)
130                                 if onComplete != nil {
131                                         onComplete(uuid)
132                                 }
133                         }
134                         logger.Print("[test] exiting crunch-run stub")
135                         fvm.Lock()
136                         defer fvm.Unlock()
137                         delete(fvm.running, uuid)
138                 }()
139                 return 0
140         }
141         if command == "crunch-run --list" {
142                 fvm.Lock()
143                 defer fvm.Unlock()
144                 for uuid := range fvm.running {
145                         fmt.Fprintf(stdout, "%s\n", uuid)
146                 }
147                 return 0
148         }
149         if strings.HasPrefix(command, "crunch-run --kill ") {
150                 fvm.Lock()
151                 defer fvm.Unlock()
152                 if fvm.running[uuid] {
153                         delete(fvm.running, uuid)
154                 } else {
155                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
156                 }
157                 return 0
158         }
159         if command == "true" {
160                 return 0
161         }
162         fmt.Fprintf(stderr, "%q: command not found", command)
163         return 1
164 }
165
166 type DispatcherSuite struct {
167         cluster     *arvados.Cluster
168         instanceSet *test.LameInstanceSet
169         stubDriver  *test.StubDriver
170         disp        *dispatcher
171 }
172
173 func (s *DispatcherSuite) SetUpSuite(c *check.C) {
174         if os.Getenv("ARVADOS_DEBUG") != "" {
175                 logrus.StandardLogger().SetLevel(logrus.DebugLevel)
176         }
177 }
178
179 func (s *DispatcherSuite) SetUpTest(c *check.C) {
180         dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
181         dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
182         c.Assert(err, check.IsNil)
183
184         _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
185         s.stubDriver = &test.StubDriver{
186                 Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
187                         c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
188                         return 1
189                 },
190                 HostKey:        hostpriv,
191                 AuthorizedKeys: []ssh.PublicKey{dispatchpub},
192
193                 ErrorRateDestroy: 0.1,
194         }
195
196         s.cluster = &arvados.Cluster{
197                 CloudVMs: arvados.CloudVMs{
198                         Driver:          "test",
199                         SyncInterval:    arvados.Duration(10 * time.Millisecond),
200                         TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
201                         TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
202                         TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
203                         TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
204                 },
205                 Dispatch: arvados.Dispatch{
206                         PrivateKey:         dispatchprivraw,
207                         PollInterval:       arvados.Duration(5 * time.Millisecond),
208                         ProbeInterval:      arvados.Duration(5 * time.Millisecond),
209                         StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
210                         MaxProbesPerSecond: 1000,
211                 },
212                 InstanceTypes: arvados.InstanceTypeMap{
213                         test.InstanceType(1).Name:  test.InstanceType(1),
214                         test.InstanceType(2).Name:  test.InstanceType(2),
215                         test.InstanceType(3).Name:  test.InstanceType(3),
216                         test.InstanceType(4).Name:  test.InstanceType(4),
217                         test.InstanceType(6).Name:  test.InstanceType(6),
218                         test.InstanceType(8).Name:  test.InstanceType(8),
219                         test.InstanceType(16).Name: test.InstanceType(16),
220                 },
221                 NodeProfiles: map[string]arvados.NodeProfile{
222                         "*": {
223                                 Controller:    arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
224                                 DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
225                         },
226                 },
227         }
228         s.disp = &dispatcher{Cluster: s.cluster}
229         // Test cases can modify s.cluster before calling
230         // initialize(), and then modify private state before calling
231         // go run().
232 }
233
234 func (s *DispatcherSuite) TearDownTest(c *check.C) {
235         s.disp.Close()
236 }
237
238 // DispatchToStubDriver checks that the dispatcher wires everything
239 // together effectively. It uses a real scheduler and worker pool with
240 // a fake queue and cloud driver. The fake cloud driver injects
241 // artificial errors in order to exercise a variety of code paths.
242 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
243         drivers["test"] = s.stubDriver
244         s.disp.setupOnce.Do(s.disp.initialize)
245         queue := &test.Queue{
246                 ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
247                         return ChooseInstanceType(s.cluster, ctr)
248                 },
249         }
250         for i := 0; i < 200; i++ {
251                 queue.Containers = append(queue.Containers, arvados.Container{
252                         UUID:     test.ContainerUUID(i + 1),
253                         State:    arvados.ContainerStateQueued,
254                         Priority: int64(i%20 + 1),
255                         RuntimeConstraints: arvados.RuntimeConstraints{
256                                 RAM:   int64(i%3+1) << 30,
257                                 VCPUs: i%8 + 1,
258                         },
259                 })
260         }
261         s.disp.queue = queue
262
263         var mtx sync.Mutex
264         done := make(chan struct{})
265         waiting := map[string]struct{}{}
266         for _, ctr := range queue.Containers {
267                 waiting[ctr.UUID] = struct{}{}
268         }
269         onComplete := func(uuid string) {
270                 mtx.Lock()
271                 defer mtx.Unlock()
272                 if _, ok := waiting[uuid]; !ok {
273                         c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
274                 }
275                 delete(waiting, uuid)
276                 if len(waiting) == 0 {
277                         close(done)
278                 }
279         }
280         n := 0
281         fc := &fakeCloud{
282                 queue: queue,
283                 makeVM: func(inst cloud.Instance) *fakeVM {
284                         n++
285                         fvm := &fakeVM{
286                                 boot:                 time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
287                                 crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
288                                 ctrExit:              int(rand.Uint32() & 0x3),
289                         }
290                         switch n % 7 {
291                         case 0:
292                                 fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
293                         case 1:
294                                 fvm.crunchRunMissing = true
295                         default:
296                                 fvm.crunchRunCrashRate = 0.1
297                         }
298                         return fvm
299                 },
300                 onComplete: onComplete,
301                 onCancel:   onComplete,
302         }
303         s.stubDriver.Exec = fc.exec
304
305         start := time.Now()
306         go s.disp.run()
307         err := s.disp.CheckHealth()
308         c.Check(err, check.IsNil)
309
310         select {
311         case <-done:
312                 c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
313         case <-time.After(10 * time.Second):
314                 c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
315         }
316
317         deadline := time.Now().Add(time.Second)
318         for range time.NewTicker(10 * time.Millisecond).C {
319                 insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
320                 c.Check(err, check.IsNil)
321                 queue.Update()
322                 ents, _ := queue.Entries()
323                 if len(ents) == 0 && len(insts) == 0 {
324                         break
325                 }
326                 if time.Now().After(deadline) {
327                         c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
328                 }
329         }
330 }
331
332 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
333         drivers["test"] = s.stubDriver
334         s.cluster.ManagementToken = "abcdefgh"
335         for _, token := range []string{"abc", ""} {
336                 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
337                 if token != "" {
338                         req.Header.Set("Authorization", "Bearer "+token)
339                 }
340                 resp := httptest.NewRecorder()
341                 s.disp.ServeHTTP(resp, req)
342                 if token == "" {
343                         c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
344                 } else {
345                         c.Check(resp.Code, check.Equals, http.StatusForbidden)
346                 }
347         }
348 }
349
350 func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
351         drivers["test"] = s.stubDriver
352         s.cluster.ManagementToken = ""
353         for _, token := range []string{"abc", ""} {
354                 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
355                 if token != "" {
356                         req.Header.Set("Authorization", "Bearer "+token)
357                 }
358                 resp := httptest.NewRecorder()
359                 s.disp.ServeHTTP(resp, req)
360                 c.Check(resp.Code, check.Equals, http.StatusForbidden)
361         }
362 }
363
364 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
365         s.cluster.ManagementToken = "abcdefgh"
366         s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
367         drivers["test"] = s.stubDriver
368
369         type instance struct {
370                 Instance             string
371                 WorkerState          string
372                 Price                float64
373                 LastContainerUUID    string
374                 ArvadosInstanceType  string
375                 ProviderInstanceType string
376         }
377         type instancesResponse struct {
378                 Items []instance
379         }
380         getInstances := func() instancesResponse {
381                 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
382                 req.Header.Set("Authorization", "Bearer abcdefgh")
383                 resp := httptest.NewRecorder()
384                 s.disp.ServeHTTP(resp, req)
385                 var sr instancesResponse
386                 c.Check(resp.Code, check.Equals, http.StatusOK)
387                 err := json.Unmarshal(resp.Body.Bytes(), &sr)
388                 c.Check(err, check.IsNil)
389                 return sr
390         }
391
392         sr := getInstances()
393         c.Check(len(sr.Items), check.Equals, 0)
394
395         ch := s.disp.pool.Subscribe()
396         defer s.disp.pool.Unsubscribe(ch)
397         err := s.disp.pool.Create(test.InstanceType(1))
398         c.Check(err, check.IsNil)
399         <-ch
400
401         sr = getInstances()
402         c.Assert(len(sr.Items), check.Equals, 1)
403         c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
404         c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
405         c.Check(sr.Items[0].Price, check.Equals, 0.123)
406         c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
407         c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType)
408         c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name)
409 }