20457: Exercise quota handling in dispatcher chaos test.
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         math_rand "math/rand"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cloud"
21         "git.arvados.org/arvados.git/lib/crunchrun"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "github.com/sirupsen/logrus"
24         "golang.org/x/crypto/ssh"
25 )
26
27 // A StubDriver implements cloud.Driver by setting up local SSH
28 // servers that do fake command executions.
29 type StubDriver struct {
30         HostKey        ssh.Signer
31         AuthorizedKeys []ssh.PublicKey
32
33         // SetupVM, if set, is called upon creation of each new
34         // StubVM. This is the caller's opportunity to customize the
35         // VM's error rate and other behaviors.
36         SetupVM func(*StubVM)
37
38         // Bugf, if set, is called if a bug is detected in the caller
39         // or stub. Typically set to (*check.C)Errorf. If unset,
40         // logger.Warnf is called instead.
41         Bugf func(string, ...interface{})
42
43         // StubVM's fake crunch-run uses this Queue to read and update
44         // container state.
45         Queue *Queue
46
47         // Frequency of artificially introduced errors on calls to
48         // Create and Destroy. 0=always succeed, 1=always fail.
49         ErrorRateCreate  float64
50         ErrorRateDestroy float64
51
52         // If Create() or Instances() is called too frequently, return
53         // rate-limiting errors.
54         MinTimeBetweenCreateCalls    time.Duration
55         MinTimeBetweenInstancesCalls time.Duration
56
57         QuotaMaxInstances int
58
59         // If true, Create and Destroy calls block until Release() is
60         // called.
61         HoldCloudOps bool
62
63         instanceSets []*StubInstanceSet
64         holdCloudOps chan bool
65 }
66
67 // InstanceSet returns a new *StubInstanceSet.
68 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
69         if sd.holdCloudOps == nil {
70                 sd.holdCloudOps = make(chan bool)
71         }
72         sis := StubInstanceSet{
73                 driver:  sd,
74                 logger:  logger,
75                 servers: map[cloud.InstanceID]*StubVM{},
76         }
77         sd.instanceSets = append(sd.instanceSets, &sis)
78
79         var err error
80         if params != nil {
81                 err = json.Unmarshal(params, &sis)
82         }
83         return &sis, err
84 }
85
86 // InstanceSets returns all instances that have been created by the
87 // driver. This can be used to test a component that uses the driver
88 // but doesn't expose the InstanceSets it has created.
89 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
90         return sd.instanceSets
91 }
92
93 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
94 // are fewer than n blocked calls pending, it waits for the rest to
95 // arrive.
96 func (sd *StubDriver) ReleaseCloudOps(n int) {
97         for i := 0; i < n; i++ {
98                 <-sd.holdCloudOps
99         }
100 }
101
102 type StubInstanceSet struct {
103         driver  *StubDriver
104         logger  logrus.FieldLogger
105         servers map[cloud.InstanceID]*StubVM
106         mtx     sync.RWMutex
107         stopped bool
108
109         allowCreateCall    time.Time
110         allowInstancesCall time.Time
111         lastInstanceID     int
112 }
113
114 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
115         if sis.driver.HoldCloudOps {
116                 sis.driver.holdCloudOps <- true
117         }
118         sis.mtx.Lock()
119         defer sis.mtx.Unlock()
120         if sis.stopped {
121                 return nil, errors.New("StubInstanceSet: Create called after Stop")
122         }
123         if sis.allowCreateCall.After(time.Now()) {
124                 return nil, RateLimitError{sis.allowCreateCall}
125         }
126         if math_rand.Float64() < sis.driver.ErrorRateCreate {
127                 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
128         }
129         if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
130                 return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
131         }
132         sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
133         ak := sis.driver.AuthorizedKeys
134         if authKey != nil {
135                 ak = append([]ssh.PublicKey{authKey}, ak...)
136         }
137         sis.lastInstanceID++
138         svm := &StubVM{
139                 InitCommand:  initCommand,
140                 sis:          sis,
141                 id:           cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
142                 tags:         copyTags(tags),
143                 providerType: it.ProviderType,
144                 running:      map[string]stubProcess{},
145                 killing:      map[string]bool{},
146         }
147         svm.SSHService = SSHService{
148                 HostKey:        sis.driver.HostKey,
149                 AuthorizedUser: "root",
150                 AuthorizedKeys: ak,
151                 Exec:           svm.Exec,
152         }
153         if setup := sis.driver.SetupVM; setup != nil {
154                 setup(svm)
155         }
156         sis.servers[svm.id] = svm
157         return svm.Instance(), nil
158 }
159
160 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
161         sis.mtx.RLock()
162         defer sis.mtx.RUnlock()
163         if sis.allowInstancesCall.After(time.Now()) {
164                 return nil, RateLimitError{sis.allowInstancesCall}
165         }
166         sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
167         var r []cloud.Instance
168         for _, ss := range sis.servers {
169                 r = append(r, ss.Instance())
170         }
171         return r, nil
172 }
173
174 func (sis *StubInstanceSet) Stop() {
175         sis.mtx.Lock()
176         defer sis.mtx.Unlock()
177         if sis.stopped {
178                 panic("Stop called twice")
179         }
180         sis.stopped = true
181 }
182
183 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
184         sis.mtx.Lock()
185         defer sis.mtx.Unlock()
186         for _, vm := range sis.servers {
187                 svms = append(svms, vm)
188         }
189         return
190 }
191
192 type RateLimitError struct{ Retry time.Time }
193
194 func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
195 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
196
197 // StubVM is a fake server that runs an SSH service. It represents a
198 // VM running in a fake cloud.
199 //
200 // Note this is distinct from a stubInstance, which is a snapshot of
201 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
202 // running (and might change IP addresses, shut down, etc.)  without
203 // updating any stubInstances that have been returned to callers.
204 type StubVM struct {
205         Boot                  time.Time
206         Broken                time.Time
207         ReportBroken          time.Time
208         CrunchRunMissing      bool
209         CrunchRunCrashRate    float64
210         CrunchRunDetachDelay  time.Duration
211         ArvMountMaxExitLag    time.Duration
212         ArvMountDeadlockRate  float64
213         ExecuteContainer      func(arvados.Container) int
214         CrashRunningContainer func(arvados.Container)
215         ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-config "
216
217         // Populated by (*StubInstanceSet)Create()
218         InitCommand cloud.InitCommand
219
220         sis          *StubInstanceSet
221         id           cloud.InstanceID
222         tags         cloud.InstanceTags
223         providerType string
224         SSHService   SSHService
225         running      map[string]stubProcess
226         killing      map[string]bool
227         lastPID      int64
228         deadlocked   string
229         sync.Mutex
230 }
231
232 type stubProcess struct {
233         pid int64
234
235         // crunch-run has exited, but arv-mount process (or something)
236         // still holds lock in /var/run/
237         exited bool
238 }
239
240 func (svm *StubVM) Instance() stubInstance {
241         svm.Lock()
242         defer svm.Unlock()
243         return stubInstance{
244                 svm:  svm,
245                 addr: svm.SSHService.Address(),
246                 // We deliberately return a cached/stale copy of the
247                 // real tags here, so that (Instance)Tags() sometimes
248                 // returns old data after a call to
249                 // (Instance)SetTags().  This is permitted by the
250                 // driver interface, and this might help remind
251                 // callers that they need to tolerate it.
252                 tags: copyTags(svm.tags),
253         }
254 }
255
256 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
257         stdinData, err := ioutil.ReadAll(stdin)
258         if err != nil {
259                 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
260                 return 1
261         }
262         queue := svm.sis.driver.Queue
263         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
264         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
265                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
266                 return 1
267         }
268         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
269                 fmt.Fprintf(stderr, "cannot fork\n")
270                 return 2
271         }
272         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
273                 fmt.Fprint(stderr, "crunch-run: command not found\n")
274                 return 1
275         }
276         if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
277                 var configData crunchrun.ConfigData
278                 err := json.Unmarshal(stdinData, &configData)
279                 if err != nil {
280                         fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
281                         return 1
282                 }
283                 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
284                         if configData.Env[name] == "" {
285                                 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
286                                 return 1
287                         }
288                 }
289                 svm.Lock()
290                 svm.lastPID++
291                 pid := svm.lastPID
292                 svm.running[uuid] = stubProcess{pid: pid}
293                 svm.Unlock()
294                 time.Sleep(svm.CrunchRunDetachDelay)
295                 fmt.Fprintf(stderr, "starting %s\n", uuid)
296                 logger := svm.sis.logger.WithFields(logrus.Fields{
297                         "Instance":      svm.id,
298                         "ContainerUUID": uuid,
299                         "PID":           pid,
300                 })
301                 logger.Printf("[test] starting crunch-run stub")
302                 go func() {
303                         var ctr arvados.Container
304                         var started, completed bool
305                         defer func() {
306                                 logger.Print("[test] exiting crunch-run stub")
307                                 svm.Lock()
308                                 defer svm.Unlock()
309                                 if svm.running[uuid].pid != pid {
310                                         bugf := svm.sis.driver.Bugf
311                                         if bugf == nil {
312                                                 bugf = logger.Warnf
313                                         }
314                                         bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
315                                         return
316                                 }
317                                 if !completed {
318                                         logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
319                                         if started && svm.CrashRunningContainer != nil {
320                                                 svm.CrashRunningContainer(ctr)
321                                         }
322                                 }
323                                 sproc := svm.running[uuid]
324                                 sproc.exited = true
325                                 svm.running[uuid] = sproc
326                                 svm.Unlock()
327                                 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
328                                 svm.Lock()
329                                 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
330                                         delete(svm.running, uuid)
331                                 }
332                         }()
333
334                         crashluck := math_rand.Float64()
335                         wantCrash := crashluck < svm.CrunchRunCrashRate
336                         wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
337
338                         ctr, ok := queue.Get(uuid)
339                         if !ok {
340                                 logger.Print("[test] container not in queue")
341                                 return
342                         }
343
344                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
345
346                         svm.Lock()
347                         killed := svm.killing[uuid]
348                         svm.Unlock()
349                         if killed || wantCrashEarly {
350                                 return
351                         }
352
353                         ctr.State = arvados.ContainerStateRunning
354                         started = queue.Notify(ctr)
355                         if !started {
356                                 ctr, _ = queue.Get(uuid)
357                                 logger.Print("[test] erroring out because state=Running update was rejected")
358                                 return
359                         }
360
361                         if wantCrash {
362                                 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
363                                 return
364                         }
365                         if svm.ExecuteContainer != nil {
366                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
367                         }
368                         logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
369                         ctr.State = arvados.ContainerStateComplete
370                         completed = queue.Notify(ctr)
371                 }()
372                 return 0
373         }
374         if command == "crunch-run --list" {
375                 svm.Lock()
376                 defer svm.Unlock()
377                 for uuid, sproc := range svm.running {
378                         if sproc.exited {
379                                 fmt.Fprintf(stdout, "%s stale\n", uuid)
380                         } else {
381                                 fmt.Fprintf(stdout, "%s\n", uuid)
382                         }
383                 }
384                 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
385                         fmt.Fprintln(stdout, "broken")
386                 }
387                 fmt.Fprintln(stdout, svm.deadlocked)
388                 return 0
389         }
390         if strings.HasPrefix(command, "crunch-run --kill ") {
391                 svm.Lock()
392                 sproc, running := svm.running[uuid]
393                 if running && !sproc.exited {
394                         svm.killing[uuid] = true
395                         svm.Unlock()
396                         time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
397                         svm.Lock()
398                         sproc, running = svm.running[uuid]
399                 }
400                 svm.Unlock()
401                 if running && !sproc.exited {
402                         fmt.Fprintf(stderr, "%s: container is running\n", uuid)
403                         return 1
404                 }
405                 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
406                 return 0
407         }
408         if command == "true" {
409                 return 0
410         }
411         fmt.Fprintf(stderr, "%q: command not found", command)
412         return 1
413 }
414
415 type stubInstance struct {
416         svm  *StubVM
417         addr string
418         tags cloud.InstanceTags
419 }
420
421 func (si stubInstance) ID() cloud.InstanceID {
422         return si.svm.id
423 }
424
425 func (si stubInstance) Address() string {
426         return si.addr
427 }
428
429 func (si stubInstance) RemoteUser() string {
430         return si.svm.SSHService.AuthorizedUser
431 }
432
433 func (si stubInstance) Destroy() error {
434         sis := si.svm.sis
435         if sis.driver.HoldCloudOps {
436                 sis.driver.holdCloudOps <- true
437         }
438         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
439                 return errors.New("instance could not be destroyed")
440         }
441         si.svm.SSHService.Close()
442         sis.mtx.Lock()
443         defer sis.mtx.Unlock()
444         delete(sis.servers, si.svm.id)
445         return nil
446 }
447
448 func (si stubInstance) ProviderType() string {
449         return si.svm.providerType
450 }
451
452 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
453         tags = copyTags(tags)
454         svm := si.svm
455         go func() {
456                 svm.Lock()
457                 defer svm.Unlock()
458                 svm.tags = tags
459         }()
460         return nil
461 }
462
463 func (si stubInstance) Tags() cloud.InstanceTags {
464         // Return a copy to ensure a caller can't change our saved
465         // tags just by writing to the returned map.
466         return copyTags(si.tags)
467 }
468
469 func (si stubInstance) String() string {
470         return string(si.svm.id)
471 }
472
473 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
474         buf := make([]byte, 512)
475         _, err := io.ReadFull(rand.Reader, buf)
476         if err != nil {
477                 return err
478         }
479         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
480         if err != nil {
481                 return err
482         }
483         return key.Verify(buf, sig)
484 }
485
486 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
487         dst := cloud.InstanceTags{}
488         for k, v := range src {
489                 dst[k] = v
490         }
491         return dst
492 }
493
494 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
495         return nil
496 }
497
498 type QuotaError struct {
499         error
500 }
501
502 func (QuotaError) IsQuotaError() bool { return true }