21258: Fix "container completed twice" testing bug.
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         math_rand "math/rand"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cloud"
21         "git.arvados.org/arvados.git/lib/crunchrun"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "github.com/prometheus/client_golang/prometheus"
24         "github.com/sirupsen/logrus"
25         "golang.org/x/crypto/ssh"
26 )
27
28 // A StubDriver implements cloud.Driver by setting up local SSH
29 // servers that do fake command executions.
30 type StubDriver struct {
31         HostKey        ssh.Signer
32         AuthorizedKeys []ssh.PublicKey
33
34         // SetupVM, if set, is called upon creation of each new
35         // StubVM. This is the caller's opportunity to customize the
36         // VM's error rate and other behaviors.
37         //
38         // If SetupVM returns an error, that error will be returned to
39         // the caller of Create(), and the new VM will be discarded.
40         SetupVM func(*StubVM) error
41
42         // Bugf, if set, is called if a bug is detected in the caller
43         // or stub. Typically set to (*check.C)Errorf. If unset,
44         // logger.Warnf is called instead.
45         Bugf func(string, ...interface{})
46
47         // StubVM's fake crunch-run uses this Queue to read and update
48         // container state.
49         Queue *Queue
50
51         // Frequency of artificially introduced errors on calls to
52         // Create and Destroy. 0=always succeed, 1=always fail.
53         ErrorRateCreate  float64
54         ErrorRateDestroy float64
55
56         // If Create() or Instances() is called too frequently, return
57         // rate-limiting errors.
58         MinTimeBetweenCreateCalls    time.Duration
59         MinTimeBetweenInstancesCalls time.Duration
60
61         QuotaMaxInstances int
62
63         // If true, Create and Destroy calls block until Release() is
64         // called.
65         HoldCloudOps bool
66
67         instanceSets []*StubInstanceSet
68         holdCloudOps chan bool
69 }
70
71 // InstanceSet returns a new *StubInstanceSet.
72 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
73         if sd.holdCloudOps == nil {
74                 sd.holdCloudOps = make(chan bool)
75         }
76         sis := StubInstanceSet{
77                 driver:  sd,
78                 logger:  logger,
79                 servers: map[cloud.InstanceID]*StubVM{},
80         }
81         sd.instanceSets = append(sd.instanceSets, &sis)
82
83         var err error
84         if params != nil {
85                 err = json.Unmarshal(params, &sis)
86         }
87         return &sis, err
88 }
89
90 // InstanceSets returns all instances that have been created by the
91 // driver. This can be used to test a component that uses the driver
92 // but doesn't expose the InstanceSets it has created.
93 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
94         return sd.instanceSets
95 }
96
97 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
98 // are fewer than n blocked calls pending, it waits for the rest to
99 // arrive.
100 func (sd *StubDriver) ReleaseCloudOps(n int) {
101         for i := 0; i < n; i++ {
102                 <-sd.holdCloudOps
103         }
104 }
105
106 type StubInstanceSet struct {
107         driver  *StubDriver
108         logger  logrus.FieldLogger
109         servers map[cloud.InstanceID]*StubVM
110         mtx     sync.RWMutex
111         stopped bool
112
113         allowCreateCall    time.Time
114         allowInstancesCall time.Time
115         lastInstanceID     int
116 }
117
118 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
119         if sis.driver.HoldCloudOps {
120                 sis.driver.holdCloudOps <- true
121         }
122         sis.mtx.Lock()
123         defer sis.mtx.Unlock()
124         if sis.stopped {
125                 return nil, errors.New("StubInstanceSet: Create called after Stop")
126         }
127         if sis.allowCreateCall.After(time.Now()) {
128                 return nil, RateLimitError{sis.allowCreateCall}
129         }
130         if math_rand.Float64() < sis.driver.ErrorRateCreate {
131                 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
132         }
133         if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
134                 return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
135         }
136         sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
137         ak := sis.driver.AuthorizedKeys
138         if authKey != nil {
139                 ak = append([]ssh.PublicKey{authKey}, ak...)
140         }
141         sis.lastInstanceID++
142         svm := &StubVM{
143                 InitCommand:  initCommand,
144                 sis:          sis,
145                 id:           cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
146                 tags:         copyTags(tags),
147                 providerType: it.ProviderType,
148                 running:      map[string]stubProcess{},
149                 killing:      map[string]bool{},
150         }
151         svm.SSHService = SSHService{
152                 HostKey:        sis.driver.HostKey,
153                 AuthorizedUser: "root",
154                 AuthorizedKeys: ak,
155                 Exec:           svm.Exec,
156         }
157         if setup := sis.driver.SetupVM; setup != nil {
158                 err := setup(svm)
159                 if err != nil {
160                         return nil, err
161                 }
162         }
163         sis.servers[svm.id] = svm
164         return svm.Instance(), nil
165 }
166
167 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
168         sis.mtx.RLock()
169         defer sis.mtx.RUnlock()
170         if sis.allowInstancesCall.After(time.Now()) {
171                 return nil, RateLimitError{sis.allowInstancesCall}
172         }
173         sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
174         var r []cloud.Instance
175         for _, ss := range sis.servers {
176                 r = append(r, ss.Instance())
177         }
178         return r, nil
179 }
180
181 func (sis *StubInstanceSet) Stop() {
182         sis.mtx.Lock()
183         defer sis.mtx.Unlock()
184         if sis.stopped {
185                 panic("Stop called twice")
186         }
187         sis.stopped = true
188 }
189
190 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
191         sis.mtx.Lock()
192         defer sis.mtx.Unlock()
193         for _, vm := range sis.servers {
194                 svms = append(svms, vm)
195         }
196         return
197 }
198
199 type RateLimitError struct{ Retry time.Time }
200
201 func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
202 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
203
204 type CapacityError struct{ InstanceTypeSpecific bool }
205
206 func (e CapacityError) Error() string                { return "insufficient capacity" }
207 func (e CapacityError) IsCapacityError() bool        { return true }
208 func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
209
210 // StubVM is a fake server that runs an SSH service. It represents a
211 // VM running in a fake cloud.
212 //
213 // Note this is distinct from a stubInstance, which is a snapshot of
214 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
215 // running (and might change IP addresses, shut down, etc.)  without
216 // updating any stubInstances that have been returned to callers.
217 type StubVM struct {
218         Boot                  time.Time
219         Broken                time.Time
220         ReportBroken          time.Time
221         CrunchRunMissing      bool
222         CrunchRunCrashRate    float64
223         CrunchRunDetachDelay  time.Duration
224         ArvMountMaxExitLag    time.Duration
225         ArvMountDeadlockRate  float64
226         ExecuteContainer      func(arvados.Container) int
227         CrashRunningContainer func(arvados.Container)
228         ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-config "
229
230         // Populated by (*StubInstanceSet)Create()
231         InitCommand cloud.InitCommand
232
233         sis          *StubInstanceSet
234         id           cloud.InstanceID
235         tags         cloud.InstanceTags
236         providerType string
237         SSHService   SSHService
238         running      map[string]stubProcess
239         killing      map[string]bool
240         lastPID      int64
241         deadlocked   string
242         stubprocs    sync.WaitGroup
243         destroying   bool
244         sync.Mutex
245 }
246
247 type stubProcess struct {
248         pid int64
249
250         // crunch-run has exited, but arv-mount process (or something)
251         // still holds lock in /var/run/
252         exited bool
253 }
254
255 func (svm *StubVM) Instance() stubInstance {
256         svm.Lock()
257         defer svm.Unlock()
258         return stubInstance{
259                 svm:  svm,
260                 addr: svm.SSHService.Address(),
261                 // We deliberately return a cached/stale copy of the
262                 // real tags here, so that (Instance)Tags() sometimes
263                 // returns old data after a call to
264                 // (Instance)SetTags().  This is permitted by the
265                 // driver interface, and this might help remind
266                 // callers that they need to tolerate it.
267                 tags: copyTags(svm.tags),
268         }
269 }
270
271 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
272         // Ensure we don't start any new stubprocs after Destroy()
273         // has started Wait()ing for stubprocs to end.
274         svm.Lock()
275         if svm.destroying {
276                 svm.Unlock()
277                 return 1
278         }
279         svm.stubprocs.Add(1)
280         defer svm.stubprocs.Done()
281         svm.Unlock()
282
283         stdinData, err := ioutil.ReadAll(stdin)
284         if err != nil {
285                 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
286                 return 1
287         }
288         queue := svm.sis.driver.Queue
289         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
290         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
291                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
292                 return 1
293         }
294         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
295                 fmt.Fprintf(stderr, "cannot fork\n")
296                 return 2
297         }
298         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
299                 fmt.Fprint(stderr, "crunch-run: command not found\n")
300                 return 1
301         }
302         if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
303                 var configData crunchrun.ConfigData
304                 err := json.Unmarshal(stdinData, &configData)
305                 if err != nil {
306                         fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
307                         return 1
308                 }
309                 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
310                         if configData.Env[name] == "" {
311                                 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
312                                 return 1
313                         }
314                 }
315                 svm.Lock()
316                 svm.lastPID++
317                 pid := svm.lastPID
318                 svm.running[uuid] = stubProcess{pid: pid}
319                 svm.Unlock()
320
321                 time.Sleep(svm.CrunchRunDetachDelay)
322
323                 svm.Lock()
324                 defer svm.Unlock()
325                 if svm.destroying {
326                         fmt.Fprint(stderr, "crunch-run: killed by system shutdown\n")
327                         return 9
328                 }
329                 fmt.Fprintf(stderr, "starting %s\n", uuid)
330                 logger := svm.sis.logger.WithFields(logrus.Fields{
331                         "Instance":      svm.id,
332                         "ContainerUUID": uuid,
333                         "PID":           pid,
334                 })
335                 logger.Printf("[test] starting crunch-run stub")
336                 svm.stubprocs.Add(1)
337                 go func() {
338                         defer svm.stubprocs.Done()
339                         var ctr arvados.Container
340                         var started, completed bool
341                         defer func() {
342                                 logger.Print("[test] exiting crunch-run stub")
343                                 svm.Lock()
344                                 defer svm.Unlock()
345                                 if svm.destroying {
346                                         return
347                                 }
348                                 if svm.running[uuid].pid != pid {
349                                         bugf := svm.sis.driver.Bugf
350                                         if bugf == nil {
351                                                 bugf = logger.Warnf
352                                         }
353                                         bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
354                                         return
355                                 }
356                                 if !completed {
357                                         logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
358                                         if started && svm.CrashRunningContainer != nil {
359                                                 svm.CrashRunningContainer(ctr)
360                                         }
361                                 }
362                                 sproc := svm.running[uuid]
363                                 sproc.exited = true
364                                 svm.running[uuid] = sproc
365                                 svm.Unlock()
366                                 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
367                                 svm.Lock()
368                                 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
369                                         delete(svm.running, uuid)
370                                 }
371                         }()
372
373                         crashluck := math_rand.Float64()
374                         wantCrash := crashluck < svm.CrunchRunCrashRate
375                         wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
376
377                         ctr, ok := queue.Get(uuid)
378                         if !ok {
379                                 logger.Print("[test] container not in queue")
380                                 return
381                         }
382
383                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
384
385                         svm.Lock()
386                         killed := svm.killing[uuid]
387                         delete(svm.killing, uuid)
388                         destroying := svm.destroying
389                         svm.Unlock()
390                         if killed || wantCrashEarly || destroying {
391                                 return
392                         }
393
394                         ctr.State = arvados.ContainerStateRunning
395                         started = queue.Notify(ctr)
396                         if !started {
397                                 ctr, _ = queue.Get(uuid)
398                                 logger.Print("[test] erroring out because state=Running update was rejected")
399                                 return
400                         }
401
402                         if wantCrash {
403                                 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
404                                 return
405                         }
406                         if svm.ExecuteContainer != nil {
407                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
408                         }
409                         logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
410                         ctr.State = arvados.ContainerStateComplete
411                         completed = queue.Notify(ctr)
412                 }()
413                 return 0
414         }
415         if command == "crunch-run --list" {
416                 svm.Lock()
417                 defer svm.Unlock()
418                 for uuid, sproc := range svm.running {
419                         if sproc.exited {
420                                 fmt.Fprintf(stdout, "%s stale\n", uuid)
421                         } else {
422                                 fmt.Fprintf(stdout, "%s\n", uuid)
423                         }
424                 }
425                 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
426                         fmt.Fprintln(stdout, "broken")
427                 }
428                 fmt.Fprintln(stdout, svm.deadlocked)
429                 return 0
430         }
431         if strings.HasPrefix(command, "crunch-run --kill ") {
432                 svm.Lock()
433                 sproc, running := svm.running[uuid]
434                 if running && !sproc.exited {
435                         svm.killing[uuid] = true
436                         svm.Unlock()
437                         time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
438                         svm.Lock()
439                         sproc, running = svm.running[uuid]
440                 }
441                 svm.Unlock()
442                 if running && !sproc.exited {
443                         fmt.Fprintf(stderr, "%s: container is running\n", uuid)
444                         return 1
445                 }
446                 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
447                 return 0
448         }
449         if command == "true" {
450                 return 0
451         }
452         fmt.Fprintf(stderr, "%q: command not found", command)
453         return 1
454 }
455
456 type stubInstance struct {
457         svm  *StubVM
458         addr string
459         tags cloud.InstanceTags
460 }
461
462 func (si stubInstance) ID() cloud.InstanceID {
463         return si.svm.id
464 }
465
466 func (si stubInstance) Address() string {
467         return si.addr
468 }
469
470 func (si stubInstance) RemoteUser() string {
471         return si.svm.SSHService.AuthorizedUser
472 }
473
474 func (si stubInstance) Destroy() error {
475         sis := si.svm.sis
476         if sis.driver.HoldCloudOps {
477                 sis.driver.holdCloudOps <- true
478         }
479         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
480                 return errors.New("instance could not be destroyed")
481         }
482         si.svm.Lock()
483         si.svm.destroying = true
484         si.svm.Unlock()
485         si.svm.stubprocs.Wait()
486         si.svm.SSHService.Close()
487         sis.mtx.Lock()
488         defer sis.mtx.Unlock()
489         delete(sis.servers, si.svm.id)
490         return nil
491 }
492
493 func (si stubInstance) ProviderType() string {
494         return si.svm.providerType
495 }
496
497 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
498         tags = copyTags(tags)
499         svm := si.svm
500         go func() {
501                 svm.Lock()
502                 defer svm.Unlock()
503                 svm.tags = tags
504         }()
505         return nil
506 }
507
508 func (si stubInstance) Tags() cloud.InstanceTags {
509         // Return a copy to ensure a caller can't change our saved
510         // tags just by writing to the returned map.
511         return copyTags(si.tags)
512 }
513
514 func (si stubInstance) String() string {
515         return string(si.svm.id)
516 }
517
518 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
519         buf := make([]byte, 512)
520         _, err := io.ReadFull(rand.Reader, buf)
521         if err != nil {
522                 return err
523         }
524         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
525         if err != nil {
526                 return err
527         }
528         return key.Verify(buf, sig)
529 }
530
531 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
532         dst := cloud.InstanceTags{}
533         for k, v := range src {
534                 dst[k] = v
535         }
536         return dst
537 }
538
539 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
540         return nil
541 }
542
543 type QuotaError struct {
544         error
545 }
546
547 func (QuotaError) IsQuotaError() bool { return true }