14360: Remove redundant wkr.booted flag.
[arvados.git] / lib / dispatchcloud / dispatcher_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "math/rand"
13         "net/http/httptest"
14         "os"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/lib/cloud"
21         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
22         "git.curoverse.com/arvados.git/sdk/go/arvados"
23         "github.com/Sirupsen/logrus"
24         "golang.org/x/crypto/ssh"
25         check "gopkg.in/check.v1"
26 )
27
28 var _ = check.Suite(&DispatcherSuite{})
29
30 // fakeCloud provides an exec method that can be used as a
31 // test.StubExecFunc. It calls the provided makeVM func when called
32 // with a previously unseen instance ID. Calls to exec are passed on
33 // to the *fakeVM for the appropriate instance ID.
34 type fakeCloud struct {
35         queue      *test.Queue
36         makeVM     func(cloud.Instance) *fakeVM
37         onComplete func(string)
38         onCancel   func(string)
39         vms        map[cloud.InstanceID]*fakeVM
40         sync.Mutex
41 }
42
43 func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
44         fc.Lock()
45         fvm, ok := fc.vms[inst.ID()]
46         if !ok {
47                 if fc.vms == nil {
48                         fc.vms = make(map[cloud.InstanceID]*fakeVM)
49                 }
50                 fvm = fc.makeVM(inst)
51                 fc.vms[inst.ID()] = fvm
52         }
53         fc.Unlock()
54         return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
55 }
56
57 // fakeVM is a fake VM with configurable delays and failure modes.
58 type fakeVM struct {
59         boot                 time.Time
60         broken               time.Time
61         crunchRunMissing     bool
62         crunchRunCrashRate   float64
63         crunchRunDetachDelay time.Duration
64         ctrExit              int
65         running              map[string]bool
66         completed            []string
67         sync.Mutex
68 }
69
70 func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
71         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
72         if eta := fvm.boot.Sub(time.Now()); eta > 0 {
73                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
74                 return 1
75         }
76         if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
77                 fmt.Fprintf(stderr, "cannot fork\n")
78                 return 2
79         }
80         if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
81                 fmt.Fprint(stderr, "crunch-run: command not found\n")
82                 return 1
83         }
84         if strings.HasPrefix(command, "crunch-run --detach ") {
85                 fvm.Lock()
86                 if fvm.running == nil {
87                         fvm.running = map[string]bool{}
88                 }
89                 fvm.running[uuid] = true
90                 fvm.Unlock()
91                 time.Sleep(fvm.crunchRunDetachDelay)
92                 fmt.Fprintf(stderr, "starting %s\n", uuid)
93                 logger := logrus.WithField("ContainerUUID", uuid)
94                 logger.Printf("[test] starting crunch-run stub")
95                 go func() {
96                         crashluck := rand.Float64()
97                         ctr, ok := queue.Get(uuid)
98                         if !ok {
99                                 logger.Print("[test] container not in queue")
100                                 return
101                         }
102                         if crashluck > fvm.crunchRunCrashRate/2 {
103                                 time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
104                                 ctr.State = arvados.ContainerStateRunning
105                                 queue.Notify(ctr)
106                         }
107
108                         time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
109                         fvm.Lock()
110                         _, running := fvm.running[uuid]
111                         fvm.Unlock()
112                         if !running {
113                                 logger.Print("[test] container was killed")
114                                 return
115                         }
116                         if crashluck < fvm.crunchRunCrashRate {
117                                 logger.Print("[test] crashing crunch-run stub")
118                                 if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
119                                         onCancel(uuid)
120                                 }
121                         } else {
122                                 ctr.State = arvados.ContainerStateComplete
123                                 ctr.ExitCode = fvm.ctrExit
124                                 queue.Notify(ctr)
125                                 if onComplete != nil {
126                                         onComplete(uuid)
127                                 }
128                         }
129                         logger.Print("[test] exiting crunch-run stub")
130                         fvm.Lock()
131                         defer fvm.Unlock()
132                         delete(fvm.running, uuid)
133                 }()
134                 return 0
135         }
136         if command == "crunch-run --list" {
137                 fvm.Lock()
138                 defer fvm.Unlock()
139                 for uuid := range fvm.running {
140                         fmt.Fprintf(stdout, "%s\n", uuid)
141                 }
142                 return 0
143         }
144         if strings.HasPrefix(command, "crunch-run --kill ") {
145                 fvm.Lock()
146                 defer fvm.Unlock()
147                 if fvm.running[uuid] {
148                         delete(fvm.running, uuid)
149                 } else {
150                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
151                 }
152                 return 0
153         }
154         if command == "true" {
155                 return 0
156         }
157         fmt.Fprintf(stderr, "%q: command not found", command)
158         return 1
159 }
160
161 type DispatcherSuite struct {
162         cluster     *arvados.Cluster
163         instanceSet *test.LameInstanceSet
164         stubDriver  *test.StubDriver
165         disp        *dispatcher
166 }
167
168 func (s *DispatcherSuite) SetUpSuite(c *check.C) {
169         if os.Getenv("ARVADOS_DEBUG") != "" {
170                 logrus.StandardLogger().SetLevel(logrus.DebugLevel)
171         }
172 }
173
174 func (s *DispatcherSuite) SetUpTest(c *check.C) {
175         dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
176         dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
177         c.Assert(err, check.IsNil)
178
179         _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
180         s.stubDriver = &test.StubDriver{
181                 Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
182                         c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
183                         return 1
184                 },
185                 HostKey:        hostpriv,
186                 AuthorizedKeys: []ssh.PublicKey{dispatchpub},
187         }
188
189         s.cluster = &arvados.Cluster{
190                 CloudVMs: arvados.CloudVMs{
191                         Driver:          "test",
192                         SyncInterval:    arvados.Duration(10 * time.Millisecond),
193                         TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
194                         TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
195                         TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
196                         TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
197                 },
198                 Dispatch: arvados.Dispatch{
199                         PrivateKey:         dispatchprivraw,
200                         PollInterval:       arvados.Duration(5 * time.Millisecond),
201                         ProbeInterval:      arvados.Duration(5 * time.Millisecond),
202                         StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
203                         MaxProbesPerSecond: 1000,
204                 },
205                 InstanceTypes: arvados.InstanceTypeMap{
206                         test.InstanceType(1).Name:  test.InstanceType(1),
207                         test.InstanceType(2).Name:  test.InstanceType(2),
208                         test.InstanceType(3).Name:  test.InstanceType(3),
209                         test.InstanceType(4).Name:  test.InstanceType(4),
210                         test.InstanceType(6).Name:  test.InstanceType(6),
211                         test.InstanceType(8).Name:  test.InstanceType(8),
212                         test.InstanceType(16).Name: test.InstanceType(16),
213                 },
214                 NodeProfiles: map[string]arvados.NodeProfile{
215                         "*": {
216                                 Controller:    arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
217                                 DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
218                         },
219                 },
220         }
221         s.disp = &dispatcher{Cluster: s.cluster}
222         // Test cases can modify s.cluster before calling
223         // initialize(), and then modify private state before calling
224         // go run().
225 }
226
227 func (s *DispatcherSuite) TearDownTest(c *check.C) {
228         s.disp.Close()
229 }
230
231 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
232         drivers["test"] = s.stubDriver
233         s.disp.setupOnce.Do(s.disp.initialize)
234         queue := &test.Queue{
235                 ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
236                         return ChooseInstanceType(s.cluster, ctr)
237                 },
238         }
239         for i := 0; i < 200; i++ {
240                 queue.Containers = append(queue.Containers, arvados.Container{
241                         UUID:     test.ContainerUUID(i + 1),
242                         State:    arvados.ContainerStateQueued,
243                         Priority: int64(i%20 + 1),
244                         RuntimeConstraints: arvados.RuntimeConstraints{
245                                 RAM:   int64(i%3+1) << 30,
246                                 VCPUs: i%8 + 1,
247                         },
248                 })
249         }
250         s.disp.queue = queue
251
252         var mtx sync.Mutex
253         done := make(chan struct{})
254         waiting := map[string]struct{}{}
255         for _, ctr := range queue.Containers {
256                 waiting[ctr.UUID] = struct{}{}
257         }
258         onComplete := func(uuid string) {
259                 mtx.Lock()
260                 defer mtx.Unlock()
261                 if _, ok := waiting[uuid]; !ok {
262                         c.Errorf("container completed twice: %s", uuid)
263                 }
264                 delete(waiting, uuid)
265                 if len(waiting) == 0 {
266                         close(done)
267                 }
268         }
269         n := 0
270         fc := &fakeCloud{
271                 queue: queue,
272                 makeVM: func(inst cloud.Instance) *fakeVM {
273                         n++
274                         fvm := &fakeVM{
275                                 boot:                 time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
276                                 crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
277                                 ctrExit:              int(rand.Uint32() & 0x3),
278                         }
279                         switch n % 7 {
280                         case 0:
281                                 fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
282                         case 1:
283                                 fvm.crunchRunMissing = true
284                         default:
285                                 fvm.crunchRunCrashRate = 0.1
286                         }
287                         return fvm
288                 },
289                 onComplete: onComplete,
290                 onCancel:   onComplete,
291         }
292         s.stubDriver.Exec = fc.exec
293
294         start := time.Now()
295         go s.disp.run()
296         err := s.disp.CheckHealth()
297         c.Check(err, check.IsNil)
298
299         select {
300         case <-done:
301                 c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
302         case <-time.After(10 * time.Second):
303                 c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
304         }
305
306         deadline := time.Now().Add(time.Second)
307         for range time.NewTicker(10 * time.Millisecond).C {
308                 insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
309                 c.Check(err, check.IsNil)
310                 queue.Update()
311                 ents, _ := queue.Entries()
312                 if len(ents) == 0 && len(insts) == 0 {
313                         break
314                 }
315                 if time.Now().After(deadline) {
316                         c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
317                 }
318         }
319 }
320
321 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
322         s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
323         drivers["test"] = s.stubDriver
324
325         type instance struct {
326                 Instance             string
327                 WorkerState          string
328                 Price                float64
329                 LastContainerUUID    string
330                 ArvadosInstanceType  string
331                 ProviderInstanceType string
332         }
333         type instancesResponse struct {
334                 Items []instance
335         }
336         getInstances := func() instancesResponse {
337                 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
338                 resp := httptest.NewRecorder()
339                 s.disp.ServeHTTP(resp, req)
340                 var sr instancesResponse
341                 err := json.Unmarshal(resp.Body.Bytes(), &sr)
342                 c.Check(err, check.IsNil)
343                 return sr
344         }
345
346         sr := getInstances()
347         c.Check(len(sr.Items), check.Equals, 0)
348
349         ch := s.disp.pool.Subscribe()
350         defer s.disp.pool.Unsubscribe(ch)
351         err := s.disp.pool.Create(test.InstanceType(1))
352         c.Check(err, check.IsNil)
353         <-ch
354
355         sr = getInstances()
356         c.Assert(len(sr.Items), check.Equals, 1)
357         c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
358         c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
359         c.Check(sr.Items[0].Price, check.Equals, 0.123)
360         c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
361         c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType)
362         c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name)
363 }